From b5074fd597a5dea8e257f3d58b29bd5bf14a8a2e Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Fri, 8 Oct 2021 17:35:36 -0500 Subject: [PATCH] Reworked all of the system data to encode granule data more efficiently for persistence --- fdbclient/BlobGranuleCommon.h | 12 + fdbclient/BlobWorkerInterface.h | 24 +- fdbclient/ClientKnobs.cpp | 1 + fdbclient/NativeAPI.actor.cpp | 8 +- fdbclient/SystemData.cpp | 190 +++- fdbclient/SystemData.h | 35 +- fdbserver/BlobManager.actor.cpp | 59 +- fdbserver/BlobWorker.actor.cpp | 896 +++++++++++------- .../workloads/BlobGranuleVerifier.actor.cpp | 29 +- flow/ProtocolVersion.h | 1 + 10 files changed, 837 insertions(+), 418 deletions(-) diff --git a/fdbclient/BlobGranuleCommon.h b/fdbclient/BlobGranuleCommon.h index 5b162a8b50..166064861e 100644 --- a/fdbclient/BlobGranuleCommon.h +++ b/fdbclient/BlobGranuleCommon.h @@ -117,4 +117,16 @@ struct BlobGranuleChunkRef { }; enum BlobGranuleSplitState { Unknown = 0, Started = 1, Assigned = 2, Done = 3 }; + +struct BlobGranuleHistoryValue { + constexpr static FileIdentifier file_identifier = 991434; + UID granuleID; + VectorRef> parentGranules; + + template + void serialize(Ar& ar) { + serializer(ar, granuleID, parentGranules); + } +}; + #endif \ No newline at end of file diff --git a/fdbclient/BlobWorkerInterface.h b/fdbclient/BlobWorkerInterface.h index 370b931835..3015475b3a 100644 --- a/fdbclient/BlobWorkerInterface.h +++ b/fdbclient/BlobWorkerInterface.h @@ -155,16 +155,34 @@ struct GranuleStatusReply : public ReplyPromiseStreamReply { bool doSplit; int64_t epoch; int64_t seqno; + UID granuleID; + Version startVersion; + Version latestVersion; GranuleStatusReply() {} - explicit GranuleStatusReply(KeyRange range, bool doSplit, int64_t epoch, int64_t seqno) - : granuleRange(range), doSplit(doSplit), epoch(epoch), seqno(seqno) {} + explicit GranuleStatusReply(KeyRange range, + bool doSplit, + int64_t epoch, + int64_t seqno, + UID granuleID, + Version startVersion, + Version latestVersion) + : granuleRange(range), doSplit(doSplit), epoch(epoch), seqno(seqno), granuleID(granuleID), + startVersion(startVersion), latestVersion(latestVersion) {} int expectedSize() const { return sizeof(GranuleStatusReply) + granuleRange.expectedSize(); } template void serialize(Ar& ar) { - serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, granuleRange, doSplit, epoch, seqno); + serializer(ar, + ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, + granuleRange, + doSplit, + epoch, + seqno, + granuleID, + startVersion, + latestVersion); } }; diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index 4f0021cb6d..880d67b664 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -73,6 +73,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( KEY_SIZE_LIMIT, 1e4 ); init( SYSTEM_KEY_SIZE_LIMIT, 3e4 ); init( VALUE_SIZE_LIMIT, 1e5 ); + // TODO find better solution? init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 );// if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - 31;//serverKeysPrefixFor(UID()).size() - 1; init( METADATA_VERSION_CACHE_SIZE, 1000 ); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 7a6f89c1b0..fa5eb7c642 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7296,12 +7296,16 @@ ACTOR Future readBlobGranulesStreamActor(Reference db, for (i = 0; i < blobGranuleMapping.size() - 1; i++) { granuleStartKey = blobGranuleMapping[i].key; granuleEndKey = blobGranuleMapping[i + 1].key; + // if this was a time travel and the request returned larger bounds, skip this chunk + if (granuleEndKey <= keyRange.begin) { + continue; + } workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value); // prune first/last granules to requested range - if (i == 0) { + if (keyRange.begin > granuleStartKey) { granuleStartKey = keyRange.begin; } - if (i == blobGranuleMapping.size() - 2) { + if (keyRange.end < granuleEndKey) { granuleEndKey = keyRange.end; } diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index daedfa740d..5ab492842e 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1100,40 +1100,131 @@ const KeyRangeRef blobRangeKeys(LiteralStringRef("\xff\x02/blobRange/"), Literal const KeyRef blobManagerEpochKey = LiteralStringRef("\xff\x02/blobManagerEpoch"); const Value blobManagerEpochValueFor(int64_t epoch) { - BinaryWriter wr(Unversioned()); + BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); wr << epoch; return wr.toValue(); } int64_t decodeBlobManagerEpochValue(ValueRef const& value) { int64_t epoch; - BinaryReader reader(value, Unversioned()); + BinaryReader reader(value, IncludeVersion()); reader >> epoch; return epoch; } -// blob range file data +// blob granule data const KeyRangeRef blobGranuleFileKeys(LiteralStringRef("\xff\x02/bgf/"), LiteralStringRef("\xff\x02/bgf0")); const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff\x02/bgm/"), LiteralStringRef("\xff\x02/bgm0")); const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff\x02/bgl/"), LiteralStringRef("\xff\x02/bgl0")); const KeyRangeRef blobGranuleSplitKeys(LiteralStringRef("\xff\x02/bgs/"), LiteralStringRef("\xff\x02/bgs0")); const KeyRangeRef blobGranuleHistoryKeys(LiteralStringRef("\xff\x02/bgh/"), LiteralStringRef("\xff\x02/bgh0")); +const uint8_t BG_FILE_TYPE_DELTA = 'D'; +const uint8_t BG_FILE_TYPE_SNAPSHOT = 'S'; + +// uids in blob granule file/split keys are serialized big endian so that incrementUID can create a prefix range for +// just that UID. + +// TODO: could move this to UID or sometwhere else? +UID incrementUID(UID uid) { + uint64_t first = uid.first(); + uint64_t second = uid.second() + 1; + // handle overflow increment of second + if (second == 0) { + first++; + // FIXME: assume we never generate max uid, for now + ASSERT(first != 0); + } + + return UID(first, second); +} + +void serializeUIDBigEndian(BinaryWriter& wr, UID uid) { + wr << bigEndian64(uid.first()); + wr << bigEndian64(uid.second()); +} + +UID deserializeUIDBigEndian(BinaryReader& reader) { + uint64_t first; + uint64_t second; + reader >> first; + reader >> second; + return UID(bigEndian64(first), bigEndian64(second)); +} + +const Key blobGranuleFileKeyFor(UID granuleID, uint8_t fileType, Version fileVersion) { + ASSERT(fileType == 'D' || fileType == 'S'); + BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); + wr.serializeBytes(blobGranuleFileKeys.begin); + serializeUIDBigEndian(wr, granuleID); + wr << fileType; + wr << bigEndian64(fileVersion); + return wr.toValue(); +} + +std::tuple decodeBlobGranuleFileKey(KeyRef const& key) { + uint8_t fileType; + Version fileVersion; + BinaryReader reader(key.removePrefix(blobGranuleFileKeys.begin), AssumeVersion(ProtocolVersion::withBlobGranule())); + UID granuleID = deserializeUIDBigEndian(reader); + reader >> fileType; + reader >> fileVersion; + ASSERT(fileType == 'D' || fileType == 'S'); + return std::tuple(granuleID, fileType, bigEndian64(fileVersion)); +} + +Key bgFilePrefixKey(UID gid) { + BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); + wr.serializeBytes(blobGranuleFileKeys.begin); + serializeUIDBigEndian(wr, gid); + return wr.toValue(); +} + +const KeyRange blobGranuleFileKeyRangeFor(UID granuleID) { + return KeyRangeRef(bgFilePrefixKey(granuleID), bgFilePrefixKey(incrementUID(granuleID))); +} + +const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length) { + BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); + wr << filename; + wr << offset; + wr << length; + return wr.toValue(); +} + +std::tuple, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value) { + StringRef filename; + int64_t offset; + int64_t length; + BinaryReader reader(value, IncludeVersion()); + reader >> filename; + reader >> offset; + reader >> length; + return std::tuple(filename, offset, length); +} + const Value blobGranuleMappingValueFor(UID const& workerID) { - BinaryWriter wr(Unversioned()); + BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); wr << workerID; return wr.toValue(); } UID decodeBlobGranuleMappingValue(ValueRef const& value) { UID workerID; - BinaryReader reader(value, Unversioned()); + BinaryReader reader(value, IncludeVersion()); reader >> workerID; return workerID; } +const Key blobGranuleLockKeyFor(KeyRangeRef const& keyRange) { + BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); + wr.serializeBytes(blobGranuleLockKeys.begin); + wr << keyRange; + return wr.toValue(); +} + const Value blobGranuleLockValueFor(int64_t epoch, int64_t seqno, UID changeFeedId) { - BinaryWriter wr(Unversioned()); + BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); wr << epoch; wr << seqno; wr << changeFeedId; @@ -1143,15 +1234,44 @@ const Value blobGranuleLockValueFor(int64_t epoch, int64_t seqno, UID changeFeed std::tuple decodeBlobGranuleLockValue(const ValueRef& value) { int64_t epoch, seqno; UID changeFeedId; - BinaryReader reader(value, Unversioned()); + BinaryReader reader(value, IncludeVersion()); reader >> epoch; reader >> seqno; reader >> changeFeedId; return std::make_tuple(epoch, seqno, changeFeedId); } +const Key blobGranuleSplitKeyFor(UID const& parentGranuleID, UID const& granuleID) { + BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); + wr.serializeBytes(blobGranuleSplitKeys.begin); + serializeUIDBigEndian(wr, parentGranuleID); + serializeUIDBigEndian(wr, granuleID); + return wr.toValue(); +} + +std::pair decodeBlobGranuleSplitKey(KeyRef const& key) { + + BinaryReader reader(key.removePrefix(blobGranuleSplitKeys.begin), + AssumeVersion(ProtocolVersion::withBlobGranule())); + + UID parentGranuleID = deserializeUIDBigEndian(reader); + UID currentGranuleID = deserializeUIDBigEndian(reader); + return std::pair(parentGranuleID, currentGranuleID); +} + +Key bgSplitPrefixKeyFor(UID gid) { + BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); + wr.serializeBytes(blobGranuleSplitKeys.begin); + serializeUIDBigEndian(wr, gid); + return wr.toValue(); +} + +const KeyRange blobGranuleSplitKeyRangeFor(UID const& parentGranuleID) { + return KeyRangeRef(bgSplitPrefixKeyFor(parentGranuleID), bgSplitPrefixKeyFor(incrementUID(parentGranuleID))); +} + const Value blobGranuleSplitValueFor(BlobGranuleSplitState st) { - BinaryWriter wr(Unversioned()); + BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); wr << st; return addVersionStampAtEnd(wr.toValue()); } @@ -1159,49 +1279,67 @@ const Value blobGranuleSplitValueFor(BlobGranuleSplitState st) { std::pair decodeBlobGranuleSplitValue(const ValueRef& value) { BlobGranuleSplitState st; Version v; - BinaryReader reader(value, Unversioned()); + BinaryReader reader(value, IncludeVersion()); reader >> st; reader >> v; return std::pair(st, v); } -// const Value blobGranuleHistoryValueFor(VectorRef const& parentGranules); -// VectorRef decodeBlobGranuleHistoryValue(ValueRef const& value); - -const Value blobGranuleHistoryValueFor(Standalone> const& parentGranules) { - // FIXME: make separate version for BG - BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed())); - wr << parentGranules; +const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version) { + BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); + wr.serializeBytes(blobGranuleHistoryKeys.begin); + wr << range; + wr << bigEndian64(version); return wr.toValue(); } -Standalone> decodeBlobGranuleHistoryValue(const ValueRef& value) { - Standalone> parentGranules; +std::pair decodeBlobGranuleHistoryKey(const KeyRef& key) { + KeyRangeRef keyRange; + Version version; + BinaryReader reader(key.removePrefix(blobGranuleHistoryKeys.begin), + AssumeVersion(ProtocolVersion::withBlobGranule())); + reader >> keyRange; + reader >> version; + return std::make_pair(keyRange, bigEndian64(version)); +} + +const KeyRange blobGranuleHistoryKeyRangeFor(KeyRangeRef const& range) { + return KeyRangeRef(blobGranuleHistoryKeyFor(range, 0), blobGranuleHistoryKeyFor(range, MAX_VERSION)); +} + +const Value blobGranuleHistoryValueFor(Standalone const& historyValue) { + BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); + wr << historyValue; + return wr.toValue(); +} + +Standalone decodeBlobGranuleHistoryValue(const ValueRef& value) { + Standalone historyValue; BinaryReader reader(value, IncludeVersion()); - reader >> parentGranules; - return parentGranules; + reader >> historyValue; + return historyValue; } const KeyRangeRef blobWorkerListKeys(LiteralStringRef("\xff\x02/bwList/"), LiteralStringRef("\xff\x02/bwList0")); const Key blobWorkerListKeyFor(UID workerID) { - BinaryWriter wr(Unversioned()); + BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); wr.serializeBytes(blobWorkerListKeys.begin); wr << workerID; return wr.toValue(); } -const Value blobWorkerListValue(BlobWorkerInterface const& worker) { - return ObjectWriter::toValue(worker, IncludeVersion()); -} - UID decodeBlobWorkerListKey(KeyRef const& key) { UID workerID; - BinaryReader reader(key.removePrefix(blobWorkerListKeys.begin), Unversioned()); + BinaryReader reader(key.removePrefix(blobWorkerListKeys.begin), AssumeVersion(ProtocolVersion::withBlobGranule())); reader >> workerID; return workerID; } +const Value blobWorkerListValue(BlobWorkerInterface const& worker) { + return ObjectWriter::toValue(worker, IncludeVersion(ProtocolVersion::withBlobGranule())); +} + BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value) { BlobWorkerInterface interf; ObjectReader reader(value.begin(), IncludeVersion()); diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index bc8d1c591d..24e5fde32f 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -527,7 +527,10 @@ int64_t decodeBlobManagerEpochValue(ValueRef const& value); // blob granule keys -// \xff\x02/bgf/(startKey, endKey, {snapshot|delta}, version) = [[filename]] +extern const uint8_t BG_FILE_TYPE_DELTA; +extern const uint8_t BG_FILE_TYPE_SNAPSHOT; + +// \xff\x02/bgf/(granuleID, {snapshot|delta}, version) = [[filename]] extern const KeyRangeRef blobGranuleFileKeys; // TODO could shrink the size of the mapping keyspace by using something similar to tags instead of UIDs. We'd probably @@ -536,35 +539,53 @@ extern const KeyRangeRef blobGranuleFileKeys; // \xff\x02/bgm/[[begin]] = [[BlobWorkerUID]] extern const KeyRangeRef blobGranuleMappingKeys; -// \xff\x02/bgl/(begin,end) = (epoch, seqno, changefeed id) +// \xff\x02/bgl/(begin,end) = (epoch, seqno, granuleID) extern const KeyRangeRef blobGranuleLockKeys; -// \xff\x02/bgs/(oldbegin,oldend,newbegin) = state +// \xff\x02/bgs/(parentGranuleID, granuleID) = state extern const KeyRangeRef blobGranuleSplitKeys; -// \xff\x02/bgh/(start,end) = [(oldbegin, oldend)] +// \xff\x02/bgh/(start,end,version) = { granuleID, [parentGranuleHistoryKeys] } extern const KeyRangeRef blobGranuleHistoryKeys; +// FIXME: better way to represent file type than a string ref? +const Key blobGranuleFileKeyFor(UID granuleID, uint8_t fileType, Version fileVersion); +std::tuple decodeBlobGranuleFileKey(ValueRef const& value); +const KeyRange blobGranuleFileKeyRangeFor(UID granuleID); + +const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length); +std::tuple, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value); + const Value blobGranuleMappingValueFor(UID const& workerID); UID decodeBlobGranuleMappingValue(ValueRef const& value); +const Key blobGranuleLockKeyFor(KeyRangeRef const& granuleRange); + const Value blobGranuleLockValueFor(int64_t epochNum, int64_t sequenceNum, UID changeFeedId); // FIXME: maybe just define a struct? std::tuple decodeBlobGranuleLockValue(ValueRef const& value); +const Key blobGranuleSplitKeyFor(UID const& parentGranuleID, UID const& granuleID); +std::pair decodeBlobGranuleSplitKey(KeyRef const& key); +const KeyRange blobGranuleSplitKeyRangeFor(UID const& parentGranuleID); + // these are versionstamped const Value blobGranuleSplitValueFor(BlobGranuleSplitState st); std::pair decodeBlobGranuleSplitValue(ValueRef const& value); -const Value blobGranuleHistoryValueFor(Standalone> const& parentGranules); -Standalone> decodeBlobGranuleHistoryValue(ValueRef const& value); +const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version); +std::pair decodeBlobGranuleHistoryKey(KeyRef const& value); +const KeyRange blobGranuleHistoryKeyRangeFor(KeyRangeRef const& range); + +const Value blobGranuleHistoryValueFor(Standalone const& historyValue); +Standalone decodeBlobGranuleHistoryValue(ValueRef const& value); // \xff/bwl/[[BlobWorkerID]] = [[BlobWorkerInterface]] extern const KeyRangeRef blobWorkerListKeys; const Key blobWorkerListKeyFor(UID workerID); -const Value blobWorkerListValue(BlobWorkerInterface const& interface); UID decodeBlobWorkerListKey(KeyRef const& key); +const Value blobWorkerListValue(BlobWorkerInterface const& interface); BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value); #pragma clang diagnostic pop diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 3dcfdc87c9..6bebf4ee5c 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -25,7 +25,6 @@ #include "fdbclient/KeyRangeMap.h" #include "fdbclient/ReadYourWrites.h" #include "fdbclient/SystemData.h" -#include "fdbclient/Tuple.h" #include "fdbserver/BlobManagerInterface.h" #include "fdbserver/BlobWorker.actor.h" #include "fdbserver/Knobs.h" @@ -573,14 +572,13 @@ ACTOR Future monitorClientRanges(BlobManagerData* bmData) { } } -static Key granuleLockKey(KeyRange granuleRange) { - Tuple k; - k.append(granuleRange.begin).append(granuleRange.end); - return k.getDataAsStandalone().withPrefix(blobGranuleLockKeys.begin); -} - // FIXME: propagate errors here -ACTOR Future maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, KeyRange range) { +ACTOR Future maybeSplitRange(BlobManagerData* bmData, + UID currentWorkerId, + KeyRange granuleRange, + UID granuleID, + Version granuleStartVersion, + Version latestVersion) { state Reference tr = makeReference(bmData->db); state Standalone> newRanges; state int64_t newLockSeqno = -1; @@ -590,7 +588,7 @@ ACTOR Future maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, try { // redo split if previous txn try failed to calculate it if (newRanges.empty()) { - Standalone> _newRanges = wait(splitRange(tr, range)); + Standalone> _newRanges = wait(splitRange(tr, granuleRange)); newRanges = _newRanges; } break; @@ -603,14 +601,14 @@ ACTOR Future maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, // not large enough to split, just reassign back to worker if (BM_DEBUG) { printf("Not splitting existing range [%s - %s). Continuing assignment to %s\n", - range.begin.printable().c_str(), - range.end.printable().c_str(), + granuleRange.begin.printable().c_str(), + granuleRange.end.printable().c_str(), currentWorkerId.toString().c_str()); } RangeAssignment raContinue; raContinue.isAssign = true; raContinue.worker = currentWorkerId; - raContinue.keyRange = range; + raContinue.keyRange = granuleRange; raContinue.assign = RangeAssignmentData(true); // continue assignment and re-snapshot bmData->rangesToAssign.send(raContinue); return Void(); @@ -628,7 +626,7 @@ ACTOR Future maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, wait(checkManagerLock(tr, bmData)); // acquire lock for old granule to make sure nobody else modifies it - state Key lockKey = granuleLockKey(range); + state Key lockKey = blobGranuleLockKeyFor(granuleRange); Optional lockValue = wait(tr->get(lockKey)); ASSERT(lockValue.present()); std::tuple prevGranuleLock = decodeBlobGranuleLockValue(lockValue.get()); @@ -638,8 +636,8 @@ ACTOR Future maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, bmData->id.toString().c_str(), std::get<0>(prevGranuleLock), bmData->epoch, - range.begin.printable().c_str(), - range.end.printable().c_str()); + granuleRange.begin.printable().c_str(), + granuleRange.end.printable().c_str()); } if (bmData->iAmReplaced.canBeSet()) { @@ -659,20 +657,24 @@ ACTOR Future maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, // acquire granule lock so nobody else can make changes to this granule. tr->set(lockKey, blobGranuleLockValueFor(bmData->epoch, newLockSeqno, std::get<2>(prevGranuleLock))); - Standalone> history; - history.push_back(history.arena(), range); - Value historyValue = blobGranuleHistoryValueFor(history); // set up split metadata for (int i = 0; i < newRanges.size() - 1; i++) { - Tuple splitKey; - splitKey.append(range.begin).append(range.end).append(newRanges[i]); - tr->atomicOp(splitKey.getDataAsStandalone().withPrefix(blobGranuleSplitKeys.begin), + UID newGranuleID = deterministicRandom()->randomUniqueID(); + + Key splitKey = blobGranuleSplitKeyFor(granuleID, newGranuleID); + + tr->atomicOp(splitKey, blobGranuleSplitValueFor(BlobGranuleSplitState::Started), MutationRef::SetVersionstampedValue); - Tuple historyKey; - historyKey.append(newRanges[i]).append(newRanges[i + 1]); - tr->set(historyKey.getDataAsStandalone().withPrefix(blobGranuleHistoryKeys.begin), historyValue); + Key historyKey = blobGranuleHistoryKeyFor(KeyRangeRef(newRanges[i], newRanges[i + 1]), latestVersion); + + Standalone historyValue; + historyValue.granuleID = newGranuleID; + historyValue.parentGranules.push_back(historyValue.arena(), + std::pair(granuleRange, granuleStartVersion)); + + tr->set(historyKey, blobGranuleHistoryValueFor(historyValue)); } wait(tr->commit()); @@ -690,8 +692,8 @@ ACTOR Future maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, if (BM_DEBUG) { printf("Splitting range [%s - %s) into (%d):\n", - range.begin.printable().c_str(), - range.end.printable().c_str(), + granuleRange.begin.printable().c_str(), + granuleRange.end.printable().c_str(), newRanges.size() - 1); for (int i = 0; i < newRanges.size() - 1; i++) { printf(" [%s - %s)\n", newRanges[i].printable().c_str(), newRanges[i + 1].printable().c_str()); @@ -703,7 +705,7 @@ ACTOR Future maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId, RangeAssignment raRevoke; raRevoke.isAssign = false; raRevoke.worker = currentWorkerId; - raRevoke.keyRange = range; + raRevoke.keyRange = granuleRange; raRevoke.revoke = RangeRevokeData(false); // not a dispose bmData->rangesToAssign.send(raRevoke); @@ -824,7 +826,8 @@ ACTOR Future monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerIn rep.granuleRange.end.printable().c_str()); } lastSeenSeqno.insert(rep.granuleRange, std::pair(rep.epoch, rep.seqno)); - bmData->addActor.send(maybeSplitRange(bmData, bwInterf.id(), rep.granuleRange)); + bmData->addActor.send(maybeSplitRange( + bmData, bwInterf.id(), rep.granuleRange, rep.granuleID, rep.startVersion, rep.latestVersion)); } } } catch (Error& e) { diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 2d0a2bbf8f..c8dbc0d8c8 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -28,7 +28,6 @@ #include "fdbclient/DatabaseContext.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" -#include "fdbclient/Tuple.h" #include "fdbserver/BlobWorker.actor.h" #include "fdbserver/Knobs.h" #include "fdbserver/MutationTracking.h" @@ -60,16 +59,26 @@ struct GranuleFiles { std::deque deltaFiles; }; -// TODO needs better name, it's basically just "granule starting state" -struct GranuleChangeFeedInfo { - UID changeFeedId; +struct GranuleHistory { + KeyRange range; + Version version; + Standalone value; + + GranuleHistory() {} + + GranuleHistory(KeyRange range, Version version, Standalone value) + : range(range), version(version), value(value) {} +}; + +struct GranuleStartState { + UID granuleID; Version changeFeedStartVersion; Version previousDurableVersion; - Optional prevChangeFeedId; + Optional> parentGranule; bool doSnapshot; - Optional granuleSplitFrom; Optional blobFilesToSnapshot; Optional existingFiles; + Optional history; }; struct GranuleMetadata : NonCopyable, ReferenceCounted { @@ -97,11 +106,11 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted { int64_t continueEpoch; int64_t continueSeqno; - Promise resumeSnapshot; - - // used to coordinate granule file updater is done Promise cancelled; Promise readable; + Promise historyLoaded; + + Promise resumeSnapshot; AssignBlobRangeRequest originalReq; @@ -117,14 +126,34 @@ struct GranuleRangeMetadata { int64_t lastSeqno; Reference activeMetadata; - Future assignFuture; + Future assignFuture; Future fileUpdaterFuture; + Future historyLoaderFuture; GranuleRangeMetadata() : lastEpoch(0), lastSeqno(0) {} GranuleRangeMetadata(int64_t epoch, int64_t seqno, Reference activeMetadata) : lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {} }; +// represents a previous version of a granule, and optionally the files that compose it +struct GranuleHistoryEntry : NonCopyable, ReferenceCounted { + KeyRange range; + UID granuleID; + Version startVersion; // version of the first snapshot + Version endVersion; // version of the last delta file + + // load files lazily, and allows for clearing old cold-queried files to save memory + Future files; + + // FIXME: do skip pointers with single back-pointer and neighbor pointers + // Just parent reference for now (assumes no merging) + Reference parentGranule; + + GranuleHistoryEntry() : startVersion(invalidVersion), endVersion(invalidVersion) {} + GranuleHistoryEntry(KeyRange range, UID granuleID, Version startVersion, Version endVersion) + : range(range), granuleID(granuleID), startVersion(startVersion), endVersion(endVersion) {} +}; + // FIXME: there is a reference cycle here. BWData has GranuleRangeMetadata objects in a map, // but each of those has a future to a forever-running actor which has a reference to BWData. // To fix this, we should only pass the necessary, specfic fields of BWData to those actors @@ -147,6 +176,11 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted { Reference bstore; KeyRangeMap granuleMetadata; + // contains the history of completed granules before the existing ones. Maps to the latest one, and has + // back-pointers to earlier granules + // FIXME: expire from map after a delay when granule is revoked and the history is no longer needed + KeyRangeMap> granuleHistory; + AsyncVar pendingDeltaFileCommitChecks; AsyncVar knownCommittedVersion; uint64_t knownCommittedCheckCount = 0; @@ -215,18 +249,11 @@ static void checkGranuleLock(int64_t epoch, int64_t seqno, int64_t ownerEpoch, i } } -// TODO this is duplicated with blob manager: fix? -static Key granuleLockKey(KeyRange granuleRange) { - Tuple k; - k.append(granuleRange.begin).append(granuleRange.end); - return k.getDataAsStandalone().withPrefix(blobGranuleLockKeys.begin); -} - ACTOR Future readAndCheckGranuleLock(Reference tr, KeyRange granuleRange, int64_t epoch, int64_t seqno) { - state Key lockKey = granuleLockKey(granuleRange); + state Key lockKey = blobGranuleLockKeyFor(granuleRange); Optional lockValue = wait(tr->get(lockKey)); ASSERT(lockValue.present()); @@ -239,58 +266,76 @@ ACTOR Future readAndCheckGranuleLock(Reference return Void(); } -ACTOR Future loadPreviousFiles(Transaction* tr, KeyRange keyRange) { - // read everything from previous granule of snapshot and delta files - Tuple prevFilesStartTuple; - prevFilesStartTuple.append(keyRange.begin).append(keyRange.end); - Tuple prevFilesEndTuple; - prevFilesEndTuple.append(keyRange.begin).append(keyAfter(keyRange.end)); - Key prevFilesStartKey = prevFilesStartTuple.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin); - Key prevFilesEndKey = prevFilesEndTuple.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin); - - state KeyRange currentRange = KeyRangeRef(prevFilesStartKey, prevFilesEndKey); - - state GranuleFiles files; +// used for "business logic" of both versions of loading granule files +ACTOR Future readGranuleFiles(Transaction* tr, Key* startKey, Key endKey, GranuleFiles* files, UID granuleID) { loop { int lim = BUGGIFY ? 2 : 1000; - RangeResult res = wait(tr->getRange(currentRange, lim)); + RangeResult res = wait(tr->getRange(KeyRangeRef(*startKey, endKey), lim)); for (auto& it : res) { - Tuple fileKey = Tuple::unpack(it.key.removePrefix(blobGranuleFileKeys.begin)); - Tuple fileValue = Tuple::unpack(it.value); + UID gid; + uint8_t fileType; + Version version; - ASSERT(fileKey.size() == 4); - ASSERT(fileValue.size() == 3); + Standalone filename; + int64_t offset; + int64_t length; - ASSERT(fileKey.getString(0) == keyRange.begin); - ASSERT(fileKey.getString(1) == keyRange.end); + std::tie(gid, fileType, version) = decodeBlobGranuleFileKey(it.key); + ASSERT(gid == granuleID); - std::string fileType = fileKey.getString(2).toString(); - ASSERT(fileType == LiteralStringRef("S") || fileType == LiteralStringRef("D")); + std::tie(filename, offset, length) = decodeBlobGranuleFileValue(it.value); - BlobFileIndex idx( - fileKey.getInt(3), fileValue.getString(0).toString(), fileValue.getInt(1), fileValue.getInt(2)); - if (fileType == LiteralStringRef("S")) { - ASSERT(files.snapshotFiles.empty() || files.snapshotFiles.back().version < idx.version); - files.snapshotFiles.push_back(idx); + BlobFileIndex idx(version, filename.toString(), offset, length); + if (fileType == 'S') { + ASSERT(files->snapshotFiles.empty() || files->snapshotFiles.back().version < idx.version); + files->snapshotFiles.push_back(idx); } else { - ASSERT(files.deltaFiles.empty() || files.deltaFiles.back().version < idx.version); - files.deltaFiles.push_back(idx); + ASSERT(fileType == 'D'); + ASSERT(files->deltaFiles.empty() || files->deltaFiles.back().version < idx.version); + files->deltaFiles.push_back(idx); } } if (res.more) { - currentRange = KeyRangeRef(keyAfter(res.back().key), currentRange.end); + *startKey = keyAfter(res.back().key); } else { break; } } if (BW_DEBUG) { - printf("Loaded %d snapshot and %d delta previous files for [%s - %s)\n", - files.snapshotFiles.size(), - files.deltaFiles.size(), - keyRange.begin.printable().c_str(), - keyRange.end.printable().c_str()); + printf("Loaded %d snapshot and %d delta files for %s\n", + files->snapshotFiles.size(), + files->deltaFiles.size(), + granuleID.toString().c_str()); } + return Void(); +} + +// Read snapshot and delta files for granule history, for completed granule +// Retries on error local to this function +ACTOR Future loadHistoryFiles(Reference bwData, UID granuleID) { + state Transaction tr(bwData->db); + state KeyRange range = blobGranuleFileKeyRangeFor(granuleID); + state Key startKey = range.begin; + state GranuleFiles files; + loop { + try { + wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID)); + return files; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +// read snapshot and delta files from previous owner of the active granule +// This is separated out from above because this is done as part of granule open transaction +ACTOR Future loadPreviousFiles(Transaction* tr, UID granuleID) { + state KeyRange range = blobGranuleFileKeyRangeFor(granuleID); + // no need to add conflict range for read b/c of granule lock + state Key startKey = range.begin; + state GranuleFiles files; + wait(readGranuleFiles(tr, &startKey, range.end, &files, granuleID)); return files; } @@ -317,20 +362,11 @@ ACTOR Future loadPreviousFiles(Transaction* tr, KeyRange keyRange) // sub-granules as part of its transaction. ACTOR Future updateGranuleSplitState(Transaction* tr, - KeyRange previousGranule, - KeyRange currentGranule, - UID prevChangeFeedId, + KeyRange parentGranuleRange, + UID parentGranuleID, + UID currentGranuleID, BlobGranuleSplitState newState) { - // read all splitting state for previousGranule. If it doesn't exist, newState must == DONE - Tuple splitStateStartTuple; - splitStateStartTuple.append(previousGranule.begin).append(previousGranule.end); - Tuple splitStateEndTuple; - splitStateEndTuple.append(previousGranule.begin).append(keyAfter(previousGranule.end)); - - Key splitStateStartKey = splitStateStartTuple.getDataAsStandalone().withPrefix(blobGranuleSplitKeys.begin); - Key splitStateEndKey = splitStateEndTuple.getDataAsStandalone().withPrefix(blobGranuleSplitKeys.begin); - - state KeyRange currentRange = KeyRangeRef(splitStateStartKey, splitStateEndKey); + state KeyRange currentRange = blobGranuleSplitKeyRangeFor(parentGranuleID); RangeResult totalState = wait(tr->getRange(currentRange, 100)); // TODO is this explicit conflit range necessary with the above read? @@ -340,9 +376,7 @@ ACTOR Future updateGranuleSplitState(Transaction* tr, if (totalState.empty()) { ASSERT(newState == BlobGranuleSplitState::Done); if (BW_DEBUG) { - printf("Found empty split state for previous granule [%s - %s)\n", - previousGranule.begin.printable().c_str(), - previousGranule.end.printable().c_str()); + printf("Found empty split state for parent granule %s\n", parentGranuleID.toString().c_str()); } // must have retried and successfully nuked everything return Void(); @@ -354,9 +388,12 @@ ACTOR Future updateGranuleSplitState(Transaction* tr, int totalDone = 0; BlobGranuleSplitState currentState = BlobGranuleSplitState::Unknown; for (auto& it : totalState) { - Tuple key = Tuple::unpack(it.key.removePrefix(blobGranuleSplitKeys.begin)); - ASSERT(key.getString(0) == previousGranule.begin); - ASSERT(key.getString(1) == previousGranule.end); + UID pid; + UID cid; + std::pair k = decodeBlobGranuleSplitKey(it.key); + pid = k.first; + cid = k.second; + ASSERT(pid == parentGranuleID); BlobGranuleSplitState st = decodeBlobGranuleSplitValue(it.value).first; ASSERT(st != BlobGranuleSplitState::Unknown); @@ -365,7 +402,7 @@ ACTOR Future updateGranuleSplitState(Transaction* tr, } else if (st == BlobGranuleSplitState::Done) { totalDone++; } - if (key.getString(2) == currentGranule.begin) { + if (cid == currentGranuleID) { ASSERT(currentState == BlobGranuleSplitState::Unknown); currentState = st; } @@ -375,57 +412,44 @@ ACTOR Future updateGranuleSplitState(Transaction* tr, if (currentState < newState) { if (BW_DEBUG) { - printf("Updating granule [%s - %s) split state from [%s - %s) %d -> %d\n", - currentGranule.begin.printable().c_str(), - currentGranule.end.printable().c_str(), - previousGranule.begin.printable().c_str(), - previousGranule.end.printable().c_str(), + printf("Updating granule %s split state from %s %d -> %d\n", + currentGranuleID.toString().c_str(), + parentGranuleID.toString().c_str(), currentState, newState); } - Tuple myStateTuple; - myStateTuple.append(previousGranule.begin).append(previousGranule.end).append(currentGranule.begin); - Key myStateKey = myStateTuple.getDataAsStandalone().withPrefix(blobGranuleSplitKeys.begin); + Key myStateKey = blobGranuleSplitKeyFor(parentGranuleID, currentGranuleID); if (newState == BlobGranuleSplitState::Done && currentState == BlobGranuleSplitState::Assigned && totalDone == total - 1) { // we are the last one to change from Assigned -> Done, so everything can be cleaned up for the old // change feed and splitting state if (BW_DEBUG) { - printf("[%s - %s) destroying old change feed %s and granule lock + split state for [%s - %s)\n", - currentGranule.begin.printable().c_str(), - currentGranule.end.printable().c_str(), - prevChangeFeedId.toString().c_str(), - previousGranule.begin.printable().c_str(), - previousGranule.end.printable().c_str()); + printf("%s destroying old granule %s\n", + currentGranuleID.toString().c_str(), + parentGranuleID.toString().c_str()); } - Key oldGranuleLockKey = granuleLockKey(previousGranule); - tr->destroyChangeFeed(KeyRef(prevChangeFeedId.toString())); + Key oldGranuleLockKey = blobGranuleLockKeyFor(parentGranuleRange); + tr->destroyChangeFeed(KeyRef(parentGranuleID.toString())); tr->clear(singleKeyRange(oldGranuleLockKey)); tr->clear(currentRange); } else { if (newState == BlobGranuleSplitState::Assigned && currentState == BlobGranuleSplitState::Started && totalStarted == 1) { if (BW_DEBUG) { - printf("[%s - %s) WOULD BE stopping old change feed %s for [%s - %s)\n", - currentGranule.begin.printable().c_str(), - currentGranule.end.printable().c_str(), - prevChangeFeedId.toString().c_str(), - previousGranule.begin.printable().c_str(), - previousGranule.end.printable().c_str()); + printf("%s WOULD BE stopping change feed for old granule %s\n", + currentGranuleID.toString().c_str(), + parentGranuleID.toString().c_str()); } // FIXME: enable once implemented // tr.stopChangeFeed(KeyRef(prevChangeFeedId.toString())); } - // TODO also add versionstamp tr->atomicOp(myStateKey, blobGranuleSplitValueFor(newState), MutationRef::SetVersionstampedValue); } } else if (BW_DEBUG) { - printf("Ignoring granule [%s - %s) split state from [%s - %s) %d -> %d\n", - currentGranule.begin.printable().c_str(), - currentGranule.end.printable().c_str(), - previousGranule.begin.printable().c_str(), - previousGranule.end.printable().c_str(), + printf("Ignoring granule %s split state from %s %d -> %d\n", + currentGranuleID.toString().c_str(), + parentGranuleID.toString().c_str(), currentState, newState); } @@ -433,43 +457,32 @@ ACTOR Future updateGranuleSplitState(Transaction* tr, return Void(); } -// returns the split state for a given granule on granule reassignment +// returns the split state for a given granule on granule reassignment. Assumes granule is in fact splitting, by the +// presence of the previous granule's lock key ACTOR Future> getGranuleSplitState(Transaction* tr, - KeyRange previousGranule, - KeyRange currentGranule) { - Tuple myStateTuple; - myStateTuple.append(previousGranule.begin).append(previousGranule.end).append(currentGranule.begin); - Key myStateKey = myStateTuple.getDataAsStandalone().withPrefix(blobGranuleSplitKeys.begin); + UID parentGranuleID, + UID currentGranuleID) { + Key myStateKey = blobGranuleSplitKeyFor(parentGranuleID, currentGranuleID); Optional st = wait(tr->get(myStateKey)); - if (!st.present()) { - // must have been that all granules reached done and state was cleaned up - return std::pair(BlobGranuleSplitState::Done, invalidVersion); - } + ASSERT(st.present()); return decodeBlobGranuleSplitValue(st.get()); } -static Value getFileValue(std::string fname, int64_t offset, int64_t length) { - Tuple fileValue; - fileValue.append(fname).append(offset).append(length); - return fileValue.getDataAsStandalone(); -} - -// writeDelta file writes speculatively in the common case to optimize throughput. It creates the s3 object even -// though the data in it may not yet be committed, and even though previous delta fiels with lower versioned data -// may still be in flight. The synchronization happens after the s3 file is written, but before we update the FDB -// index of what files exist. Before updating FDB, we ensure the version is committed and all previous delta files -// have updated FDB. +// writeDelta file writes speculatively in the common case to optimize throughput. It creates the s3 object even though +// the data in it may not yet be committed, and even though previous delta fiels with lower versioned data may still be +// in flight. The synchronization happens after the s3 file is written, but before we update the FDB index of what files +// exist. Before updating FDB, we ensure the version is committed and all previous delta files have updated FDB. ACTOR Future writeDeltaFile(Reference bwData, KeyRange keyRange, + UID granuleID, int64_t epoch, int64_t seqno, Arena deltaArena, GranuleDeltas deltasToWrite, Version currentDeltaVersion, Future previousDeltaFileFuture, - Optional oldChangeFeedDataComplete, - Optional oldChangeFeedId) { + Optional> oldGranuleComplete) { wait(delay(0, TaskPriority::BlobWorkerUpdateStorage)); // potentially kick off delta file commit check, if our version isn't already known to be committed state uint64_t checkCount = -1; @@ -516,27 +529,25 @@ ACTOR Future writeDeltaFile(Reference bwData, tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno)); - Tuple deltaFileKey; - deltaFileKey.append(keyRange.begin).append(keyRange.end); - deltaFileKey.append(LiteralStringRef("D")).append(currentDeltaVersion); - Key dfKey = deltaFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin); - tr->set(dfKey, getFileValue(fname, 0, serialized.size())); + Key dfKey = blobGranuleFileKeyFor(granuleID, 'D', currentDeltaVersion); + Value dfValue = blobGranuleFileValueFor(fname, 0, serialized.size()); + tr->set(dfKey, dfValue); - // FIXME: if previous granule present and delta file version >= previous change feed version, update - // the state here - if (oldChangeFeedDataComplete.present()) { - ASSERT(oldChangeFeedId.present()); + // FIXME: if previous granule present and delta file version >= previous change feed version, update the + // state here + if (oldGranuleComplete.present()) { wait(updateGranuleSplitState(&tr->getTransaction(), - oldChangeFeedDataComplete.get(), - keyRange, - oldChangeFeedId.get(), + oldGranuleComplete.get().first, + oldGranuleComplete.get().second, + granuleID, BlobGranuleSplitState::Done)); } wait(tr->commit()); if (BW_DEBUG) { - printf("Granule [%s - %s) updated fdb with delta file %s of size %d at version %lld, cv=%lld\n", + printf("Granule %s [%s - %s) updated fdb with delta file %s of size %d at version %lld, cv=%lld\n", + granuleID.toString().c_str(), keyRange.begin.printable().c_str(), keyRange.end.printable().c_str(), fname.c_str(), @@ -579,10 +590,12 @@ ACTOR Future writeDeltaFile(Reference bwData, ACTOR Future writeSnapshot(Reference bwData, KeyRange keyRange, + UID granuleID, int64_t epoch, int64_t seqno, Version version, - PromiseStream rows) { + PromiseStream rows, + bool createGranuleHistory) { // TODO some sort of directory structure would be useful maybe? state std::string fname = deterministicRandom()->randomUniqueID().toString() + "_T" + std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(version) + ".snapshot"; @@ -643,12 +656,8 @@ ACTOR Future writeSnapshot(Reference bwData, wait(objectFile->finish()); wait(delay(0, TaskPriority::BlobWorkerUpdateFDB)); - // object uploaded successfully, save it to system key space // TODO add conflict range for writes? - state Tuple snapshotFileKey; - snapshotFileKey.append(keyRange.begin).append(keyRange.end); - snapshotFileKey.append(LiteralStringRef("S")).append(version); state Reference tr = makeReference(bwData->db); state int numIterations = 0; @@ -658,8 +667,16 @@ ACTOR Future writeSnapshot(Reference bwData, tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno)); - tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), - getFileValue(fname, 0, serialized.size())); + Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, 'S', version); + Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serialized.size()); + tr->set(snapshotFileKey, snapshotFileValue); + // create granule history at version if this is a new granule with the initial dump from FDB + if (createGranuleHistory) { + Key historyKey = blobGranuleHistoryKeyFor(keyRange, version); + Standalone historyValue; + historyValue.granuleID = granuleID; + tr->set(historyKey, blobGranuleHistoryValueFor(historyValue)); + } wait(tr->commit()); break; } catch (Error& e) { @@ -705,7 +722,8 @@ ACTOR Future writeSnapshot(Reference bwData, } ACTOR Future dumpInitialSnapshotFromFDB(Reference bwData, - Reference metadata) { + Reference metadata, + UID granuleID) { if (BW_DEBUG) { printf("Dumping snapshot from FDB for [%s - %s)\n", metadata->keyRange.begin.printable().c_str(), @@ -719,8 +737,14 @@ ACTOR Future dumpInitialSnapshotFromFDB(Reference try { state Version readVersion = wait(tr->getReadVersion()); state PromiseStream rowsStream; - state Future snapshotWriter = writeSnapshot( - bwData, metadata->keyRange, metadata->originalEpoch, metadata->originalSeqno, readVersion, rowsStream); + state Future snapshotWriter = writeSnapshot(bwData, + metadata->keyRange, + granuleID, + metadata->originalEpoch, + metadata->originalSeqno, + readVersion, + rowsStream, + true); loop { // TODO: use streaming range read @@ -752,9 +776,11 @@ ACTOR Future dumpInitialSnapshotFromFDB(Reference } } -// files might not be the current set of files in metadata, in the case of doing the initial snapshot of a granule. +// files might not be the current set of files in metadata, in the case of doing the initial snapshot of a granule that +// was split. ACTOR Future compactFromBlob(Reference bwData, Reference metadata, + UID granuleID, GranuleFiles files) { wait(delay(0, TaskPriority::BlobWorkerUpdateStorage)); if (BW_DEBUG) { @@ -804,8 +830,14 @@ ACTOR Future compactFromBlob(Reference bwData, loop { try { state PromiseStream rowsStream; - state Future snapshotWriter = writeSnapshot( - bwData, metadata->keyRange, metadata->originalEpoch, metadata->originalSeqno, version, rowsStream); + state Future snapshotWriter = writeSnapshot(bwData, + metadata->keyRange, + granuleID, + metadata->originalEpoch, + metadata->originalSeqno, + version, + rowsStream, + false); RangeResult newGranule = wait(readBlobGranule(chunk, metadata->keyRange, version, bwData->bstore, &bwData->stats)); @@ -1026,17 +1058,17 @@ static Version doGranuleRollback(Reference metadata, // FIXME: handle errors here (forward errors) ACTOR Future blobGranuleUpdateFiles(Reference bwData, Reference metadata, - Future assignFuture) { + Future assignFuture) { state PromiseStream>> oldChangeFeedStream; state PromiseStream>> changeFeedStream; state Future inFlightBlobSnapshot; state std::deque inFlightDeltaFiles; state Future oldChangeFeedFuture; state Future changeFeedFuture; - state GranuleChangeFeedInfo changeFeedInfo; + state GranuleStartState startState; state bool readOldChangeFeed; state bool lastFromOldChangeFeed = false; - state Optional oldChangeFeedDataComplete; + state Optional> oldChangeFeedDataComplete; state Key cfKey; state Optional oldCFKey; @@ -1051,40 +1083,37 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, metadata->resumeSnapshot.send(Void()); // before starting, make sure worker persists range assignment and acquires the granule lock - GranuleChangeFeedInfo _info = wait(assignFuture); - changeFeedInfo = _info; + GranuleStartState _info = wait(assignFuture); + startState = _info; wait(delay(0, TaskPriority::BlobWorkerUpdateStorage)); - cfKey = StringRef(changeFeedInfo.changeFeedId.toString()); - if (changeFeedInfo.prevChangeFeedId.present()) { - oldCFKey = StringRef(changeFeedInfo.prevChangeFeedId.get().toString()); + cfKey = StringRef(startState.granuleID.toString()); + if (startState.parentGranule.present()) { + oldCFKey = StringRef(startState.parentGranule.get().second.toString()); } if (BW_DEBUG) { printf("Granule File Updater Starting for [%s - %s):\n", metadata->keyRange.begin.printable().c_str(), metadata->keyRange.end.printable().c_str()); - printf(" CFID: %s\n", changeFeedInfo.changeFeedId.toString().c_str()); - printf(" CF Start Version: %lld\n", changeFeedInfo.changeFeedStartVersion); - printf(" Previous Durable Version: %lld\n", changeFeedInfo.previousDurableVersion); - printf(" doSnapshot=%s\n", changeFeedInfo.doSnapshot ? "T" : "F"); + printf(" CFID: %s\n", startState.granuleID.toString().c_str()); + printf(" CF Start Version: %lld\n", startState.changeFeedStartVersion); + printf(" Previous Durable Version: %lld\n", startState.previousDurableVersion); + printf(" doSnapshot=%s\n", startState.doSnapshot ? "T" : "F"); printf(" Prev CFID: %s\n", - changeFeedInfo.prevChangeFeedId.present() ? changeFeedInfo.prevChangeFeedId.get().toString().c_str() - : ""); - printf(" granuleSplitFrom=%s\n", changeFeedInfo.granuleSplitFrom.present() ? "T" : "F"); - printf(" blobFilesToSnapshot=%s\n", changeFeedInfo.blobFilesToSnapshot.present() ? "T" : "F"); + startState.parentGranule.present() ? startState.parentGranule.get().second.toString().c_str() : ""); + printf(" blobFilesToSnapshot=%s\n", startState.blobFilesToSnapshot.present() ? "T" : "F"); } - // FIXME: handle reassigns by not doing a snapshot!! state Version startVersion; state BlobFileIndex newSnapshotFile; inFlightBlobSnapshot = Future(); // not valid! // if this is a reassign, calculate how close to a snapshot the previous owner was - if (changeFeedInfo.existingFiles.present()) { - GranuleFiles files = changeFeedInfo.existingFiles.get(); + if (startState.existingFiles.present()) { + GranuleFiles files = startState.existingFiles.get(); if (!files.snapshotFiles.empty() && !files.deltaFiles.empty()) { Version snapshotVersion = files.snapshotFiles.back().version; for (int i = files.deltaFiles.size() - 1; i >= 0; i--) { @@ -1093,31 +1122,34 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, } } } - metadata->files = changeFeedInfo.existingFiles.get(); + metadata->files = startState.existingFiles.get(); snapshotEligible = true; } - // FIXME: not true for reassigns - if (!changeFeedInfo.doSnapshot) { - startVersion = changeFeedInfo.previousDurableVersion; + if (!startState.doSnapshot) { + startVersion = startState.previousDurableVersion; ASSERT(!metadata->files.snapshotFiles.empty()); metadata->pendingSnapshotVersion = metadata->files.snapshotFiles.back().version; metadata->durableSnapshotVersion.set(metadata->pendingSnapshotVersion); } else { - if (changeFeedInfo.blobFilesToSnapshot.present()) { - inFlightBlobSnapshot = compactFromBlob(bwData, metadata, changeFeedInfo.blobFilesToSnapshot.get()); - startVersion = changeFeedInfo.previousDurableVersion; - metadata->durableSnapshotVersion.set( - changeFeedInfo.blobFilesToSnapshot.get().snapshotFiles.back().version); + if (startState.blobFilesToSnapshot.present()) { + inFlightBlobSnapshot = + compactFromBlob(bwData, metadata, startState.granuleID, startState.blobFilesToSnapshot.get()); + startVersion = startState.previousDurableVersion; + metadata->durableSnapshotVersion.set(startState.blobFilesToSnapshot.get().snapshotFiles.back().version); } else { - ASSERT(changeFeedInfo.previousDurableVersion == invalidVersion); - BlobFileIndex fromFDB = wait(dumpInitialSnapshotFromFDB(bwData, metadata)); + ASSERT(startState.previousDurableVersion == invalidVersion); + BlobFileIndex fromFDB = wait(dumpInitialSnapshotFromFDB(bwData, metadata, startState.granuleID)); newSnapshotFile = fromFDB; - ASSERT(changeFeedInfo.changeFeedStartVersion <= fromFDB.version); + ASSERT(startState.changeFeedStartVersion <= fromFDB.version); startVersion = newSnapshotFile.version; metadata->files.snapshotFiles.push_back(newSnapshotFile); metadata->durableSnapshotVersion.set(startVersion); + // construct fake history entry so we can store start version for splitting later + startState.history = + GranuleHistory(metadata->keyRange, startVersion, Standalone()); + wait(yield(TaskPriority::BlobWorkerUpdateStorage)); } metadata->pendingSnapshotVersion = startVersion; @@ -1130,20 +1162,19 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, ASSERT(metadata->readable.canBeSet()); metadata->readable.send(Void()); - if (changeFeedInfo.prevChangeFeedId.present()) { - // FIXME: once we have empty versions, only include up to changeFeedInfo.changeFeedStartVersion in the - // read stream. Then we can just stop the old stream when we get end_of_stream from this and not handle - // the mutation version truncation stuff + if (startState.parentGranule.present()) { + // FIXME: once we have empty versions, only include up to startState.changeFeedStartVersion in the read + // stream. Then we can just stop the old stream when we get end_of_stream from this and not handle the + // mutation version truncation stuff // FIXME: filtering on key range != change feed range doesn't work - ASSERT(changeFeedInfo.granuleSplitFrom.present()); readOldChangeFeed = true; oldChangeFeedFuture = bwData->db->getChangeFeedStream(oldChangeFeedStream, oldCFKey.get(), startVersion + 1, MAX_VERSION, - changeFeedInfo.granuleSplitFrom.get() /*metadata->keyRange*/); + startState.parentGranule.get().first /*metadata->keyRange*/); } else { readOldChangeFeed = false; changeFeedFuture = bwData->db->getChangeFeedStream( @@ -1173,7 +1204,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, metadata, completedDeltaFile, cfKey, - changeFeedInfo.changeFeedStartVersion, + startState.changeFeedStartVersion, rollbacksCompleted)); inFlightDeltaFiles.pop_front(); @@ -1196,16 +1227,13 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, Standalone> oldMutations = waitNext(oldChangeFeedStream.getFuture()); // TODO filter old mutations won't be necessary, SS does it already if (filterOldMutations( - metadata->keyRange, &oldMutations, &mutations, changeFeedInfo.changeFeedStartVersion)) { - // if old change feed has caught up with where new one would start, finish last one and start - // new one + metadata->keyRange, &oldMutations, &mutations, startState.changeFeedStartVersion)) { + // if old change feed has caught up with where new one would start, finish last one and start new + // one - Key cfKey = StringRef(changeFeedInfo.changeFeedId.toString()); - changeFeedFuture = bwData->db->getChangeFeedStream(changeFeedStream, - cfKey, - changeFeedInfo.changeFeedStartVersion, - MAX_VERSION, - metadata->keyRange); + Key cfKey = StringRef(startState.granuleID.toString()); + changeFeedFuture = bwData->db->getChangeFeedStream( + changeFeedStream, cfKey, startState.changeFeedStartVersion, MAX_VERSION, metadata->keyRange); oldChangeFeedFuture.cancel(); lastFromOldChangeFeed = true; @@ -1255,14 +1283,14 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, } Future dfFuture = writeDeltaFile(bwData, metadata->keyRange, + startState.granuleID, metadata->originalEpoch, metadata->originalSeqno, metadata->deltaArena, metadata->currentDeltas, metadata->bufferedDeltaVersion.get(), previousDeltaFileFuture, - oldChangeFeedDataComplete, - changeFeedInfo.prevChangeFeedId); + oldChangeFeedDataComplete); inFlightDeltaFiles.push_back(InFlightDeltaFile( dfFuture, metadata->bufferedDeltaVersion.get(), metadata->bufferedDeltaBytes)); @@ -1315,7 +1343,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, metadata, completedDeltaFile, cfKey, - changeFeedInfo.changeFeedStartVersion, + startState.changeFeedStartVersion, rollbacksCompleted)); wait(yield(TaskPriority::BlobWorkerUpdateStorage)); } @@ -1344,10 +1372,15 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, try { wait(bwData->currentManagerStatusStream.get().onReady()); bwData->currentManagerStatusStream.get().send( - GranuleStatusReply(metadata->keyRange, true, statusEpoch, statusSeqno)); + GranuleStatusReply(metadata->keyRange, + true, + statusEpoch, + statusSeqno, + startState.granuleID, + startState.history.get().version, + metadata->durableDeltaVersion.get())); break; } catch (Error& e) { - printf("manager stream was changed\n"); wait(bwData->currentManagerStatusStream.onChange()); } } @@ -1382,7 +1415,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, // Have to copy files object so that adding to it as we start writing new delta files in // parallel doesn't conflict. We could also pass the snapshot version and ignore any snapshot // files >= version and any delta files > version, but that's more complicated - inFlightBlobSnapshot = compactFromBlob(bwData, metadata, metadata->files); + inFlightBlobSnapshot = compactFromBlob(bwData, metadata, startState.granuleID, metadata->files); metadata->pendingSnapshotVersion = metadata->durableDeltaVersion.get(); // reset metadata @@ -1407,7 +1440,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, metadata, completedDeltaFile, cfKey, - changeFeedInfo.changeFeedStartVersion, + startState.changeFeedStartVersion, rollbacksCompleted)); wait(yield(TaskPriority::BlobWorkerUpdateStorage)); inFlightDeltaFiles.pop_front(); @@ -1479,7 +1512,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, oldCFKey.get(), cfRollbackVersion + 1, MAX_VERSION, - changeFeedInfo.granuleSplitFrom.get() /*metadata->keyRange*/); + startState.parentGranule.get().first /*metadata->keyRange*/); } else { changeFeedStream = PromiseStream>>(); changeFeedFuture = bwData->db->getChangeFeedStream(changeFeedStream, @@ -1524,13 +1557,13 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, readOldChangeFeed = false; lastFromOldChangeFeed = false; // set this so next delta file write updates granule split metadata to done - ASSERT(changeFeedInfo.granuleSplitFrom.present()); - oldChangeFeedDataComplete = changeFeedInfo.granuleSplitFrom; + ASSERT(startState.parentGranule.present()); + oldChangeFeedDataComplete = startState.parentGranule.get(); if (BW_DEBUG) { printf("Granule [%s - %s) switching to new change feed %s @ %lld\n", metadata->keyRange.begin.printable().c_str(), metadata->keyRange.end.printable().c_str(), - changeFeedInfo.changeFeedId.toString().c_str(), + startState.granuleID.toString().c_str(), metadata->bufferedDeltaVersion.get()); } } @@ -1572,6 +1605,117 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, } } +// walk graph back to previous known version +// Once loaded, go reverse up stack inserting each into the graph and setting its parent pointer. +// if a racing granule already loaded a prefix of the history, skip inserting entries already present +ACTOR Future blobGranuleLoadHistory(Reference bwData, + Reference metadata, + Future assignFuture) { + try { + GranuleStartState startState = wait(assignFuture); + state Optional activeHistory = startState.history; + + if (activeHistory.present() && activeHistory.get().value.parentGranules.size() > 0) { + state Transaction tr(bwData->db); + state GranuleHistory curHistory = activeHistory.get(); + ASSERT(activeHistory.get().value.parentGranules.size() == 1); + + state Version stopVersion; + auto prev = bwData->granuleHistory.rangeContaining(metadata->keyRange.begin); + // FIXME: not true for merges + ASSERT(prev.begin() <= metadata->keyRange.begin && prev.end() >= metadata->keyRange.end); + stopVersion = prev.value().isValid() ? prev.value()->startVersion : invalidVersion; + + state std::vector> historyEntryStack; + + // while the start version of the current granule's parent is larger than the last known start version, walk + // backwards + while (curHistory.value.parentGranules.size() > 0 && + curHistory.value.parentGranules[0].second > stopVersion) { + state GranuleHistory next; + loop { + try { + Optional v = wait(tr.get(blobGranuleHistoryKeyFor( + curHistory.value.parentGranules[0].first, curHistory.value.parentGranules[0].second))); + ASSERT(v.present()); + next = GranuleHistory(curHistory.value.parentGranules[0].first, + curHistory.value.parentGranules[0].second, + decodeBlobGranuleHistoryValue(v.get())); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + ASSERT(next.version != invalidVersion); + // granule next.granuleID goes from the version range [next.version, curHistory.version] + historyEntryStack.push_back(makeReference( + next.range, next.value.granuleID, next.version, curHistory.version)); + curHistory = next; + } + + // go back up stack and apply history entries from oldest to newest, skipping ranges that were already + // applied by other racing loads. + // yielding in this loop would mean we'd need to re-check for load races + auto prev2 = bwData->granuleHistory.rangeContaining(metadata->keyRange.begin); + // FIXME: not true for merges + ASSERT(prev2.begin() <= metadata->keyRange.begin && prev2.end() >= metadata->keyRange.end); + stopVersion = prev2.value().isValid() ? prev2.value()->startVersion : invalidVersion; + + int i = historyEntryStack.size() - 1; + while (i >= 0 && historyEntryStack[i]->startVersion <= stopVersion) { + i--; + } + int skipped = historyEntryStack.size() - 1 - i; + + while (i >= 0) { + /*printf("Applying old granule [%s - %s) @ [%lld - %lld]\n", + historyEntryStack[i]->range.begin.printable().c_str(), + historyEntryStack[i]->range.end.printable().c_str(), + historyEntryStack[i]->startVersion, + historyEntryStack[i]->endVersion);*/ + + auto prevRanges = bwData->granuleHistory.rangeContaining(historyEntryStack[i]->range.begin); + + /*printf(" prev range: [%s - %s) @ [%lld - %lld]\n", + prevRanges.begin().printable().c_str(), + prevRanges.end().printable().c_str(), + prevRanges.value().isValid() ? prevRanges.value()->startVersion : invalidVersion, + prevRanges.value().isValid() ? prevRanges.value()->endVersion : invalidVersion);*/ + + // sanity check + ASSERT(!prevRanges.value().isValid() || + prevRanges.value()->endVersion == historyEntryStack[i]->startVersion); + + historyEntryStack[i]->parentGranule = prevRanges.value(); + bwData->granuleHistory.insert(historyEntryStack[i]->range, historyEntryStack[i]); + i--; + } + + if (BW_DEBUG) { + printf("Loaded %d history entries for granule [%s - %s) (%d skipped)\n", + historyEntryStack.size(), + metadata->keyRange.begin.printable().c_str(), + metadata->keyRange.end.printable().c_str(), + skipped); + } + } + + metadata->historyLoaded.send(Void()); + return Void(); + } catch (Error& e) { + if (e.code() == error_code_operation_cancelled || e.code() == error_code_granule_assignment_conflict) { + throw e; + } + if (BW_DEBUG) { + printf("Loading blob granule history got unexpected error %s\n", e.name()); + } + // TODO this should never happen? + ASSERT(false); + throw e; + } +} + // TODO might want to separate this out for valid values for range assignments vs read requests. Assignment conflict // isn't valid for read requests but is for assignments namespace { @@ -1588,9 +1732,29 @@ bool canReplyWith(Error e) { } } // namespace -ACTOR Future waitForVersionActor(Reference metadata, Version v) { - if (metadata->readable.canBeSet()) { - wait(metadata->readable.getFuture()); +// assumes metadata is already readable and the query is reading from the active granule, not a history one +ACTOR Future waitForVersion(Reference metadata, Version v) { + // if we don't have to wait for change feed version to catch up or wait for any pending file writes to complete, + // nothing to do + + /*printf(" [%s - %s) waiting for %lld\n readable:%s\n bufferedDelta=%lld\n pendingDelta=%lld\n " + "durableDelta=%lld\n pendingSnapshot=%lld\n durableSnapshot=%lld\n", + metadata->keyRange.begin.printable().c_str(), + metadata->keyRange.end.printable().c_str(), + v, + metadata->readable.isSet() ? "T" : "F", + metadata->bufferedDeltaVersion.get(), + metadata->pendingDeltaVersion, + metadata->durableDeltaVersion.get(), + metadata->pendingSnapshotVersion, + metadata->durableSnapshotVersion.get());*/ + + if (v <= metadata->bufferedDeltaVersion.get() && + (v <= metadata->durableDeltaVersion.get() || + metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) && + (v <= metadata->durableSnapshotVersion.get() || + metadata->durableSnapshotVersion.get() == metadata->pendingSnapshotVersion)) { + return Void(); } // wait for change feed version to catch up to ensure we have all data @@ -1624,35 +1788,6 @@ ACTOR Future waitForVersionActor(Reference metadata, Vers return Void(); } -static Future waitForVersion(Reference metadata, Version v) { - // if we don't have to wait for change feed version to catch up or wait for any pending file writes to complete, - // nothing to do - - /* - printf(" [%s - %s) waiting for %lld\n readable:%s\n bufferedDelta=%lld\n pendingDelta=%lld\n " - "durableDelta=%lld\n pendingSnapshot=%lld\n durableSnapshot=%lld\n", - metadata->keyRange.begin.printable().c_str(), - metadata->keyRange.end.printable().c_str(), - v, - metadata->readable.isSet() ? "T" : "F", - metadata->bufferedDeltaVersion.get(), - metadata->pendingDeltaVersion, - metadata->durableDeltaVersion.get(), - metadata->pendingSnapshotVersion, - metadata->durableSnapshotVersion.get()); - */ - - if (metadata->readable.isSet() && v <= metadata->bufferedDeltaVersion.get() && - (v <= metadata->durableDeltaVersion.get() || - metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) && - (v <= metadata->durableSnapshotVersion.get() || - metadata->durableSnapshotVersion.get() == metadata->pendingSnapshotVersion)) { - return Void(); - } - - return waitForVersionActor(metadata, v); -} - ACTOR Future handleBlobGranuleFileRequest(Reference bwData, BlobGranuleFileRequest req) { try { // TODO REMOVE in api V2 @@ -1694,48 +1829,97 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData } // do work for each range + state Key readThrough = req.keyRange.begin; for (auto m : granules) { + if (readThrough >= m->keyRange.end) { + // previous read did time travel that already included this granule + // FIXME: this will get more complicated with merges where this could potentially include partial + // boundaries. For now with only splits we can skip the whole range + continue; + } + ASSERT(readThrough == m->keyRange.begin); state Reference metadata = m; - // try to check version_too_old, cancelled, waitForVersion without yielding first - if (metadata->readable.isSet() && - ((!metadata->files.snapshotFiles.empty() && - metadata->files.snapshotFiles.front().version > req.readVersion) || - (metadata->files.snapshotFiles.empty() && metadata->pendingSnapshotVersion > req.readVersion))) { - if (BW_REQUEST_DEBUG) { - printf("Oldest snapshot file for [%s - %s) is @ %lld, later than request version %lld\n", - req.keyRange.begin.printable().c_str(), - req.keyRange.end.printable().c_str(), - metadata->files.snapshotFiles.size() == 0 ? 0 : metadata->files.snapshotFiles[0].version, - req.readVersion); - } - throw transaction_too_old(); + + if (metadata->readable.canBeSet()) { + wait(metadata->readable.getFuture()); } if (metadata->cancelled.isSet()) { throw wrong_shard_server(); } - loop { - Future waitForVersionFuture = waitForVersion(metadata, req.readVersion); - if (waitForVersionFuture.isReady()) { - // didn't yield, so no need to check rollback stuff - break; - } - // rollback resets all of the version information, so we have to redo wait for version on rollback - state int rollbackCount = metadata->rollbackCount.get(); - choose { - when(wait(waitForVersionFuture)) {} - when(wait(metadata->rollbackCount.onChange())) {} - when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); } + state KeyRange chunkRange; + state GranuleFiles chunkFiles; + + if ((!metadata->files.snapshotFiles.empty() && + metadata->files.snapshotFiles.front().version > req.readVersion) || + (metadata->files.snapshotFiles.empty() && metadata->pendingSnapshotVersion > req.readVersion)) { + // this is a time travel query, find previous granule + if (metadata->historyLoaded.canBeSet()) { + wait(metadata->historyLoaded.getFuture()); } - if (rollbackCount == metadata->rollbackCount.get()) { - break; - } else if (BW_REQUEST_DEBUG) { - printf("[%s - %s) @ %lld hit rollback, restarting waitForVersion\n", + // FIXME: doesn't work once we add granule merging, could be multiple ranges and/or multiple parents + Reference cur = bwData->granuleHistory.rangeContaining(req.keyRange.begin).value(); + // FIXME: use skip pointers here + while (cur.isValid() && req.readVersion < cur->startVersion) { + cur = cur->parentGranule; + } + + if (!cur.isValid()) { + // this request predates blob data + // FIXME: probably want a dedicated exception like blob_range_too_old or something instead + throw transaction_too_old(); + } + + // TODO: change back to just debug + if (BW_REQUEST_DEBUG) { + printf("[%s - %s) @ %lld time traveled back to %s [%s - %s) @ [%lld - %lld)\n", req.keyRange.begin.printable().c_str(), req.keyRange.end.printable().c_str(), - req.readVersion); + req.readVersion, + cur->granuleID.toString().c_str(), + cur->range.begin.printable().c_str(), + cur->range.end.printable().c_str(), + cur->startVersion, + cur->endVersion); } + + // lazily load files for old granule if not present + chunkRange = cur->range; + if (cur->files.isError() || !cur->files.isValid()) { + cur->files = loadHistoryFiles(bwData, cur->granuleID); + } + + GranuleFiles _f = wait(cur->files); + chunkFiles = _f; + + } else { + // this is an active granule query + loop { + Future waitForVersionFuture = waitForVersion(metadata, req.readVersion); + if (waitForVersionFuture.isReady()) { + // didn't yield, so no need to check rollback stuff + break; + } + // rollback resets all of the version information, so we have to redo wait for version on rollback + state int rollbackCount = metadata->rollbackCount.get(); + choose { + when(wait(waitForVersionFuture)) {} + when(wait(metadata->rollbackCount.onChange())) {} + when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); } + } + + if (rollbackCount == metadata->rollbackCount.get()) { + break; + } else if (BW_REQUEST_DEBUG) { + printf("[%s - %s) @ %lld hit rollback, restarting waitForVersion\n", + req.keyRange.begin.printable().c_str(), + req.keyRange.end.printable().c_str(), + req.readVersion); + } + } + chunkFiles = metadata->files; + chunkRange = metadata->keyRange; } // granule is up to date, do read @@ -1743,38 +1927,29 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData BlobGranuleChunkRef chunk; // TODO change in V2 chunk.includedVersion = req.readVersion; - chunk.keyRange = KeyRangeRef(StringRef(rep.arena, metadata->keyRange.begin), - StringRef(rep.arena, metadata->keyRange.end)); + chunk.keyRange = KeyRangeRef(StringRef(rep.arena, chunkRange.begin), StringRef(rep.arena, chunkRange.end)); // handle snapshot files // TODO refactor the "find snapshot file" logic to GranuleFiles? // FIXME: binary search instead of linear search, especially when file count is large - int i = metadata->files.snapshotFiles.size() - 1; - while (i >= 0 && metadata->files.snapshotFiles[i].version > req.readVersion) { + int i = chunkFiles.snapshotFiles.size() - 1; + while (i >= 0 && chunkFiles.snapshotFiles[i].version > req.readVersion) { i--; } - // if version is older than oldest snapshot file (or no snapshot files), throw too old - // FIXME: probably want a dedicated exception like blob_range_too_old or something instead - if (i < 0) { - if (BW_REQUEST_DEBUG) { - printf("Req [%s - %s) @ %lld Got too old after initial check.\n", - req.keyRange.begin.printable().c_str(), - req.keyRange.end.printable().c_str(), - req.readVersion); - } - throw transaction_too_old(); - } + // because of granule history, we should always be able to find the desired snapshot version, and have + // thrown transaction_too_old earlier if not possible. + ASSERT(i >= 0); - BlobFileIndex snapshotF = metadata->files.snapshotFiles[i]; + BlobFileIndex snapshotF = chunkFiles.snapshotFiles[i]; chunk.snapshotFile = BlobFilenameRef(rep.arena, snapshotF.filename, snapshotF.offset, snapshotF.length); - Version snapshotVersion = metadata->files.snapshotFiles[i].version; + Version snapshotVersion = chunkFiles.snapshotFiles[i].version; // handle delta files // cast this to an int so i going to -1 still compares properly - int lastDeltaFileIdx = metadata->files.deltaFiles.size() - 1; + int lastDeltaFileIdx = chunkFiles.deltaFiles.size() - 1; i = lastDeltaFileIdx; // skip delta files that are too new - while (i >= 0 && metadata->files.deltaFiles[i].version > req.readVersion) { + while (i >= 0 && chunkFiles.deltaFiles[i].version > req.readVersion) { i--; } if (i < lastDeltaFileIdx) { @@ -1784,13 +1959,13 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData } // only include delta files after the snapshot file int j = i; - while (j >= 0 && metadata->files.deltaFiles[j].version > snapshotVersion) { + while (j >= 0 && chunkFiles.deltaFiles[j].version > snapshotVersion) { j--; } j++; Version latestDeltaVersion = invalidVersion; while (j <= i) { - BlobFileIndex deltaF = metadata->files.deltaFiles[j]; + BlobFileIndex deltaF = chunkFiles.deltaFiles[j]; chunk.deltaFiles.emplace_back_deep(rep.arena, deltaF.filename, deltaF.offset, deltaF.length); bwData->stats.readReqDeltaBytesReturned += deltaF.length; latestDeltaVersion = deltaF.version; @@ -1814,6 +1989,7 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData rep.chunks.push_back(rep.arena, chunk); bwData->stats.readReqTotalFilesReturned += chunk.deltaFiles.size() + int(chunk.snapshotFile.present()); + readThrough = chunk.keyRange.end; wait(yield(TaskPriority::DefaultEndpoint)); } @@ -1833,14 +2009,27 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData return Void(); } -ACTOR Future persistAssignWorkerRange(Reference bwData, - AssignBlobRangeRequest req) { +ACTOR Future> getLatestGranuleHistory(Transaction* tr, KeyRange range) { + KeyRange historyRange = blobGranuleHistoryKeyRangeFor(range); + RangeResult result = wait(tr->getRange(historyRange, 1, Snapshot::False, Reverse::True)); + ASSERT(result.size() <= 1); + + Optional history; + if (!result.empty()) { + std::pair decodedKey = decodeBlobGranuleHistoryKey(result[0].key); + ASSERT(range == decodedKey.first); + history = GranuleHistory(range, decodedKey.second, decodeBlobGranuleHistoryValue(result[0].value)); + } + return history; +} + +ACTOR Future openGranule(Reference bwData, AssignBlobRangeRequest req) { ASSERT(!req.continueAssignment); state Transaction tr(bwData->db); - state Key lockKey = granuleLockKey(req.keyRange); - state GranuleChangeFeedInfo info; + state Key lockKey = blobGranuleLockKeyFor(req.keyRange); + if (BW_DEBUG) { - printf("%s persisting assignment [%s - %s)\n", + printf("%s [%s - %s) opening\n", bwData->id.toString().c_str(), req.keyRange.begin.printable().c_str(), req.keyRange.end.printable().c_str()); @@ -1851,19 +2040,35 @@ ACTOR Future persistAssignWorkerRange(Reference prevLockValue = wait(tr.get(lockKey)); + + state Future> fLockValue = tr.get(lockKey); + state Future> fHistory = getLatestGranuleHistory(&tr, req.keyRange); + Optional history = wait(fHistory); + info.history = history; + Optional prevLockValue = wait(fLockValue); state bool hasPrevOwner = prevLockValue.present(); if (hasPrevOwner) { std::tuple prevOwner = decodeBlobGranuleLockValue(prevLockValue.get()); acquireGranuleLock(req.managerEpoch, req.managerSeqno, std::get<0>(prevOwner), std::get<1>(prevOwner)); - info.changeFeedId = std::get<2>(prevOwner); + info.granuleID = std::get<2>(prevOwner); - GranuleFiles granuleFiles = wait(loadPreviousFiles(&tr, req.keyRange)); + // if it's the first snapshot of a new granule, history won't be present + if (info.history.present()) { + ASSERT(info.granuleID == info.history.get().value.granuleID); + } + + GranuleFiles granuleFiles = wait(loadPreviousFiles(&tr, info.granuleID)); info.existingFiles = granuleFiles; info.doSnapshot = false; + if (!info.history.present()) { + // the only time history can be not present if a lock already exists is if it's a new granule and it + // died before it could persist the initial snapshot from FDB + ASSERT(info.existingFiles.get().snapshotFiles.empty()); + } + if (info.existingFiles.get().snapshotFiles.empty()) { ASSERT(info.existingFiles.get().deltaFiles.empty()); info.previousDurableVersion = invalidVersion; @@ -1879,59 +2084,62 @@ ACTOR Future persistAssignWorkerRange(ReferencerandomUniqueID(); - wait(tr.registerChangeFeed(StringRef(info.changeFeedId.toString()), req.keyRange)); + // FIXME: use actual 16 bytes of UID instead of converting it to 32 character string and then that + // to bytes + + if (info.history.present()) { + // if this granule is derived from a split or merge, this history entry is already present (written + // by the blob manager) + info.granuleID = info.history.get().value.granuleID; + } else { + // FIXME: could avoid max uid for granule ids here + // if this granule is not derived from a split or merge, create the granule id here + info.granuleID = deterministicRandom()->randomUniqueID(); + } + wait(tr.registerChangeFeed(StringRef(info.granuleID.toString()), req.keyRange)); info.doSnapshot = true; info.previousDurableVersion = invalidVersion; } - tr.set(lockKey, blobGranuleLockValueFor(req.managerEpoch, req.managerSeqno, info.changeFeedId)); + tr.set(lockKey, blobGranuleLockValueFor(req.managerEpoch, req.managerSeqno, info.granuleID)); wait(krmSetRange(&tr, blobGranuleMappingKeys.begin, req.keyRange, blobGranuleMappingValueFor(bwData->id))); - Tuple historyKey; - historyKey.append(req.keyRange.begin).append(req.keyRange.end); - Optional parentGranulesValue = - wait(tr.get(historyKey.getDataAsStandalone().withPrefix(blobGranuleHistoryKeys.begin))); - - // If anything in previousGranules, need to do the handoff logic and set ret.previousChangeFeedId, - // and the previous durable version will come from the previous granules - if (parentGranulesValue.present()) { - state Standalone> parentGranules = - decodeBlobGranuleHistoryValue(parentGranulesValue.get()); + // If anything in previousGranules, need to do the handoff logic and set ret.previousChangeFeedId, and + // the previous durable version will come from the previous granules + if (info.history.present() && info.history.get().value.parentGranules.size() > 0) { // TODO REMOVE if (BW_DEBUG) { printf("Decoded parent granules for [%s - %s)\n", req.keyRange.begin.printable().c_str(), req.keyRange.end.printable().c_str()); - for (auto& pg : parentGranules) { - printf(" [%s - %s)\n", pg.begin.printable().c_str(), pg.end.printable().c_str()); + for (auto& pg : info.history.get().value.parentGranules) { + printf(" [%s - %s) @ %lld\n", + pg.first.begin.printable().c_str(), + pg.first.end.printable().c_str(), + pg.second); } } // TODO change this for merge - ASSERT(parentGranules.size() == 1); + ASSERT(info.history.get().value.parentGranules.size() == 1); + state KeyRange parentGranuleRange = info.history.get().value.parentGranules[0].first; - state std::pair granuleSplitState; - if (hasPrevOwner) { - std::pair _st = - wait(getGranuleSplitState(&tr, parentGranules[0], req.keyRange)); - granuleSplitState = _st; - } else { - granuleSplitState = std::pair(BlobGranuleSplitState::Started, invalidVersion); - } + Optional parentGranuleLockValue = wait(tr.get(blobGranuleLockKeyFor(parentGranuleRange))); + if (parentGranuleLockValue.present()) { + std::tuple parentGranuleLock = + decodeBlobGranuleLockValue(parentGranuleLockValue.get()); + UID parentGranuleID = std::get<2>(parentGranuleLock); + printf(" parent granule id %s\n", parentGranuleID.toString().c_str()); - ASSERT(!hasPrevOwner || granuleSplitState.first > BlobGranuleSplitState::Started); + info.parentGranule = std::pair(parentGranuleRange, parentGranuleID); - // if granule wasn't done with old change feed, load it - if (granuleSplitState.first < BlobGranuleSplitState::Done) { - Optional prevGranuleLockValue = wait(tr.get(granuleLockKey(parentGranules[0]))); - ASSERT(prevGranuleLockValue.present()); - std::tuple prevGranuleLock = - decodeBlobGranuleLockValue(prevGranuleLockValue.get()); - info.prevChangeFeedId = std::get<2>(prevGranuleLock); - info.granuleSplitFrom = parentGranules[0]; + state std::pair granuleSplitState = + std::pair(BlobGranuleSplitState::Started, invalidVersion); + if (hasPrevOwner) { + std::pair _gss = + wait(getGranuleSplitState(&tr, parentGranuleID, info.granuleID)); + granuleSplitState = _gss; + } if (granuleSplitState.first == BlobGranuleSplitState::Assigned) { // was already assigned, use change feed start version @@ -1939,22 +2147,25 @@ ACTOR Future persistAssignWorkerRange(Reference persistAssignWorkerRange(Referenceid).detail("Granule", req.keyRange); + TraceEvent("GranuleOpen", bwData->id).detail("Granule", req.keyRange); return info; } catch (Error& e) { @@ -1987,8 +2196,9 @@ ACTOR Future persistAssignWorkerRange(Reference start(Reference bwData, GranuleRangeMetadata* meta, AssignBlobRangeRequest req) { ASSERT(meta->activeMetadata.isValid()); meta->activeMetadata->originalReq = req; - meta->assignFuture = persistAssignWorkerRange(bwData, req); + meta->assignFuture = openGranule(bwData, req); meta->fileUpdaterFuture = blobGranuleUpdateFiles(bwData, meta->activeMetadata, meta->assignFuture); + meta->historyLoaderFuture = blobGranuleLoadHistory(bwData, meta->activeMetadata, meta->assignFuture); wait(success(meta->assignFuture)); return Void(); } diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp index 763d956b88..536a9c4968 100644 --- a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -52,6 +52,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload { int64_t mismatches = 0; int64_t initialReads = 0; int64_t timeTravelReads = 0; + int64_t timeTravelTooOld = 0; int64_t rowsRead = 0; int64_t bytesRead = 0; std::vector> clients; @@ -312,7 +313,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload { }; ACTOR Future verifyGranules(Database cx, BlobGranuleVerifierWorkload* self) { - // TODO add time travel + verification state double last = now(); state double endTime = last + self->testDuration; state std::map timeTravelChecks; @@ -340,11 +340,17 @@ struct BlobGranuleVerifierWorkload : TestWorkload { timeTravelIt = timeTravelChecks.erase(timeTravelIt); // advance iterator before doing read, so if it gets error we don't retry it - std::pair>> reReadResult = - wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v)); - self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false); - - self->timeTravelReads++; + try { + std::pair>> reReadResult = + wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v)); + self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false); + self->timeTravelReads++; + } catch (Error& e) { + if (e.code() == error_code_transaction_too_old) { + self->timeTravelTooOld++; + // TODO: add debugging info for when this is a failure + } + } } // pick a random range @@ -404,6 +410,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload { state Version readVersion = wait(tr.getReadVersion()); state int checks = 0; + state bool availabilityPassed = true; state std::vector allRanges = self->granuleRanges.get(); for (auto& range : allRanges) { state KeyRange r = range; @@ -437,20 +444,24 @@ struct BlobGranuleVerifierWorkload : TestWorkload { last.begin.printable().c_str(), last.end.printable().c_str()); } - throw e; + availabilityPassed = false; + break; } } } printf("Blob Granule Verifier finished with:\n"); + printf(" %d successful final granule checks\n", checks); + printf(" %d failed final granule checks\n", availabilityPassed ? 0 : 1); printf(" %lld mismatches\n", self->mismatches); + printf(" %lld time travel too old\n", self->timeTravelTooOld); printf(" %lld errors\n", self->errors); printf(" %lld initial reads\n", self->initialReads); printf(" %lld time travel reads\n", self->timeTravelReads); printf(" %lld rows\n", self->rowsRead); printf(" %lld bytes\n", self->bytesRead); - printf(" %d final granule checks\n", checks); + // FIXME: add above as details TraceEvent("BlobGranuleVerifierChecked"); - return self->mismatches == 0 && checks > 0; + return availabilityPassed && self->mismatches == 0 && checks > 0 && self->timeTravelTooOld == 0; } Future check(Database const& cx) override { diff --git a/flow/ProtocolVersion.h b/flow/ProtocolVersion.h index ebf8df96bf..098ca011be 100644 --- a/flow/ProtocolVersion.h +++ b/flow/ProtocolVersion.h @@ -140,6 +140,7 @@ public: // introduced features PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, SpanContext); PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, ChangeFeed); PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TSS); + PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, BlobGranule); // TODO make this 7.1 or 7.2? }; template <>