First version of key-sorted delta files

This commit is contained in:
Josh Slocum 2022-07-22 11:43:49 -05:00
parent 80f4c059f1
commit 095a5a4868
5 changed files with 626 additions and 85 deletions

View File

@ -440,9 +440,9 @@ struct IndexedBlobGranuleFile {
// Non-serialized member fields
StringRef fileBytes;
void init(const Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
void init(uint8_t fType, const Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
formatVersion = LATEST_BG_FORMAT_VERSION;
fileType = SNAPSHOT_FILE_TYPE;
fileType = fType;
chunkStartOffset = -1;
}
@ -552,16 +552,47 @@ Value serializeIndexBlock(Standalone<IndexedBlobGranuleFile>& file, Optional<Blo
return ObjectWriter::toValue(file, Unversioned());
}
// TODO: this should probably be in actor file with yields?
Value serializeFileFromChunks(Standalone<IndexedBlobGranuleFile>& file,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx,
std::vector<Value>& chunks,
int previousChunkBytes) {
Value indexBlockBytes = serializeIndexBlock(file, cipherKeysCtx);
int32_t indexSize = indexBlockBytes.size();
chunks[0] = indexBlockBytes;
// TODO: write this directly to stream to avoid extra copy?
Arena ret;
size_t size = indexSize + previousChunkBytes;
uint8_t* buffer = new (ret) uint8_t[size];
uint8_t* bufferStart = buffer;
int idx = 0;
for (auto& it : chunks) {
if (BG_ENCRYPT_COMPRESS_DEBUG) {
TraceEvent(SevDebug, "SerializeFile")
.detail("ChunkIdx", idx++)
.detail("Size", it.size())
.detail("Offset", buffer - bufferStart);
}
buffer = it.copyTo(buffer);
}
ASSERT(size == buffer - bufferStart);
return Standalone<StringRef>(StringRef(bufferStart, size), ret);
}
// TODO: this should probably be in actor file with yields? - move writing logic to separate actor file in server?
// TODO: optimize memory copying
// TODO: sanity check no oversized files
// TODO: change to chunk size instead of target chunk count
Value serializeChunkedSnapshot(Standalone<GranuleSnapshot> snapshot,
int chunkCount,
Optional<CompressionFilter> compressFilter,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
Standalone<IndexedBlobGranuleFile> file;
file.init(cipherKeysCtx);
file.init(SNAPSHOT_FILE_TYPE, cipherKeysCtx);
size_t targetChunkBytes = snapshot.expectedSize() / chunkCount;
size_t currentChunkBytesEstimate = 0;
@ -612,42 +643,12 @@ Value serializeChunkedSnapshot(Standalone<GranuleSnapshot> snapshot,
file.arena(), keyAfter(snapshot.back().key), previousChunkBytes);
}
Value indexBlockBytes = serializeIndexBlock(file, cipherKeysCtx);
int32_t indexSize = indexBlockBytes.size();
chunks[0] = indexBlockBytes;
// TODO: write this directly to stream to avoid extra copy?
Arena ret;
size_t size = indexSize + previousChunkBytes;
uint8_t* buffer = new (ret) uint8_t[size];
previousChunkBytes = 0;
int idx = 0;
for (auto& it : chunks) {
if (BG_ENCRYPT_COMPRESS_DEBUG) {
TraceEvent(SevDebug, "SerializeSnapshot")
.detail("ChunkIdx", idx++)
.detail("Size", it.size())
.detail("Offset", previousChunkBytes);
}
memcpy(buffer + previousChunkBytes, it.begin(), it.size());
previousChunkBytes += it.size();
}
ASSERT(size == previousChunkBytes);
return Standalone<StringRef>(StringRef(buffer, size), ret);
}
Value serializeDeltaFile(Standalone<GranuleDeltas> deltas) {
// FIXME: better format
return ObjectWriter::toValue(deltas, Unversioned());
return serializeFileFromChunks(file, cipherKeysCtx, chunks, previousChunkBytes);
}
// TODO: use redwood prefix trick to optimize cpu comparison
static Arena loadSnapshotFile(const StringRef& snapshotData,
KeyRangeRef keyRange,
const KeyRangeRef& keyRange,
std::map<KeyRef, ValueRef>& dataMap,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
Arena rootArena;
@ -695,6 +696,378 @@ static Arena loadSnapshotFile(const StringRef& snapshotData,
return rootArena;
}
typedef std::map<Key, Standalone<DeltaBoundaryRef>> MutationBufferT;
// FIXME: optimize all of this with common prefix comparison stuff
MutationBufferT::iterator insertMutationBoundary(MutationBufferT& deltasByKey, const KeyRef& boundary) {
// Find the first split point in buffer that is >= key
auto it = deltasByKey.lower_bound(boundary);
// Since the map contains fileRange already, we had to have found something
ASSERT(it != deltasByKey.end());
if (it->first == boundary) {
return it;
}
// new boundary, using find as insert hint
it = deltasByKey.insert(it, { boundary, Standalone<DeltaBoundaryRef>() });
// look back at previous entry to see if this boundary is already cleared to at a prior version
ASSERT(it != deltasByKey.begin());
auto itPrev = it;
--itPrev;
if (itPrev->second.clearVersion.present()) {
it->second.clearVersion = itPrev->second.clearVersion;
it->second.values.push_back(it->second.arena(), ValueAndVersionRef(it->second.clearVersion.get()));
}
return it;
}
void sortDeltasByKey(const Standalone<GranuleDeltas>& deltasByVersion,
const KeyRangeRef& fileRange,
MutationBufferT& deltasByKey) {
if (deltasByVersion.empty()) {
return;
}
if (deltasByKey.empty()) {
deltasByKey.insert({ fileRange.begin, Standalone<DeltaBoundaryRef>() });
deltasByKey.insert({ fileRange.end, Standalone<DeltaBoundaryRef>() });
}
for (auto& it : deltasByVersion) {
for (auto& m : it.mutations) {
// TODO REMOVE validation
ASSERT(fileRange.contains(m.param1));
if (m.type == MutationRef::ClearRange) {
ASSERT(m.param2 <= fileRange.end);
// handle single key clear more efficiently
if (equalsKeyAfter(m.param1, m.param2)) {
MutationBufferT::iterator key = insertMutationBoundary(deltasByKey, m.param1);
// Add a clear to values if it's empty or the last item is not a clear
if (key->second.values.empty() || key->second.values.back().isSet()) {
key->second.values.push_back(key->second.arena(), ValueAndVersionRef(it.version));
}
} else {
// Update each boundary in the cleared range
MutationBufferT::iterator begin = insertMutationBoundary(deltasByKey, m.param1);
MutationBufferT::iterator end = insertMutationBoundary(deltasByKey, m.param2);
while (begin != end) {
// Set the rangeClearedVersion if not set
if (!begin->second.clearVersion.present()) {
begin->second.clearVersion = it.version;
}
// Add a clear to values if it's empty or the last item is not a clear
if (begin->second.values.empty() || begin->second.values.back().isSet()) {
begin->second.values.push_back(begin->second.arena(), ValueAndVersionRef(it.version));
}
++begin;
}
}
} else {
Standalone<DeltaBoundaryRef>& bound = insertMutationBoundary(deltasByKey, m.param1)->second;
// Add the set if values is empty or the last entry isn't set to exactly the same value
if (bound.values.empty() || bound.values.back().isClear() || bound.values.back().value != m.param2) {
bound.values.push_back(bound.arena(), ValueAndVersionRef(it.version, m.param2));
}
}
}
}
// TODO: could do a scan through map and coalesce clears (if any boundaries with exactly 1 mutation (clear) and same
// clearVersion as previous guy)
// TODO REMOVE: print at end
/*fmt::print("Sorted Deltas ({0}):\n", deltasByKey.size());
for (auto& it : deltasByKey) {
fmt::print(" {0}) ({1})\n", it.first.printable(), it.second.values.size());
for (auto& it2 : it.second.values) {
if (it2.isSet()) {
fmt::print(" {0}) =\n", it2.version);
} else if (it2.isClear()) {
fmt::print(" {0}) X\n", it2.version);
} else {
fmt::print(" {0})\n", it2.version);
}
}
if (it.second.clearVersion.present()) {
fmt::print(" Clear+ {0}\n", it.second.clearVersion.get());
}
}*/
}
// FIXME: Could maybe reduce duplicated code between this and chunkedSnapshot for chunking
Value serializeChunkedDeltaFile(Standalone<GranuleDeltas> deltas,
const KeyRangeRef& fileRange,
int chunkSize,
Optional<CompressionFilter> compressFilter,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
Standalone<IndexedBlobGranuleFile> file;
file.init(DELTA_FILE_TYPE, cipherKeysCtx);
// build in-memory version of boundaries - TODO separate functions
MutationBufferT boundaries;
sortDeltasByKey(deltas, fileRange, boundaries);
std::vector<Value> chunks;
chunks.push_back(Value()); // dummy value for index block
Standalone<GranuleSortedDeltas> currentChunk;
size_t currentChunkBytesEstimate = 0;
size_t previousChunkBytes = 0;
// TODO REMOVE - for validation
KeyRef lastKey;
int i = 0;
for (auto& it : boundaries) {
// TODO REMOVE sanity check
if (i > 0) {
ASSERT(lastKey < it.first);
}
lastKey = it.first;
it.second.key = it.first;
currentChunk.boundaries.push_back_deep(currentChunk.arena(), it.second);
currentChunkBytesEstimate += it.second.totalSize();
if (currentChunkBytesEstimate >= chunkSize || i == boundaries.size() - 1) {
// TODO: protocol version
Value serialized = ObjectWriter::toValue(currentChunk, Unversioned());
Value chunkBytes =
IndexBlobGranuleFileChunkRef::toBytes(cipherKeysCtx, compressFilter, serialized, file.arena());
chunks.push_back(chunkBytes);
// TODO remove validation
if (!file.indexBlockRef.block.children.empty()) {
ASSERT(file.indexBlockRef.block.children.back().key < currentChunk.boundaries.begin()->key);
}
file.indexBlockRef.block.children.emplace_back_deep(
file.arena(), currentChunk.boundaries.begin()->key, previousChunkBytes);
if (BG_ENCRYPT_COMPRESS_DEBUG) {
TraceEvent(SevDebug, "ChunkSize")
.detail("ChunkBytes", chunkBytes.size())
.detail("PrvChunkBytes", previousChunkBytes);
}
previousChunkBytes += chunkBytes.size();
currentChunkBytesEstimate = 0;
currentChunk = Standalone<GranuleSortedDeltas>();
}
i++;
}
ASSERT(currentChunk.boundaries.empty());
if (!deltas.empty()) {
file.indexBlockRef.block.children.emplace_back_deep(file.arena(), fileRange.end, previousChunkBytes);
}
return serializeFileFromChunks(file, cipherKeysCtx, chunks, previousChunkBytes);
}
// Effectively the single DeltaBoundaryRef reduced to one update, but also with the key and clear after information.
// Sometimes at a given version, the boundary may only be necessary to represent a clear version after this key, or just
// an update/clear to this key, or both.
struct ParsedDeltaBoundaryRef {
KeyRef key;
MutationRef::Type op; // SetValue, ClearRange, or NoOp
ValueRef value; // null unless op == SetValue
bool clearAfter;
// op constructor
ParsedDeltaBoundaryRef() {}
explicit ParsedDeltaBoundaryRef(KeyRef key, bool clearAfter, const ValueAndVersionRef& valueAndVersion)
: key(key), op(valueAndVersion.op), value(valueAndVersion.value), clearAfter(clearAfter) {}
// noop constructor
explicit ParsedDeltaBoundaryRef(KeyRef key, bool clearAfter)
: key(key), op(MutationRef::Type::NoOp), clearAfter(clearAfter) {}
ParsedDeltaBoundaryRef(Arena& arena, const ParsedDeltaBoundaryRef& copyFrom)
: key(arena, copyFrom.key), op(copyFrom.op), clearAfter(copyFrom.clearAfter) {
if (copyFrom.isSet()) {
value = StringRef(arena, copyFrom.value);
}
}
bool isSet() const { return op == MutationRef::SetValue; }
bool isClear() const { return op == MutationRef::ClearRange; }
bool redundant(bool prevClearAfter) const { return op == MutationRef::Type::NoOp && clearAfter == prevClearAfter; }
};
// TODO could move ParsedDeltaBoundaryRef struct type up to granule common and make this a member of DeltaBoundaryRef?
ParsedDeltaBoundaryRef deltaAtVersion(const DeltaBoundaryRef& delta, Version beginVersion, Version readVersion) {
bool clearAfter = delta.clearVersion.present() && readVersion >= delta.clearVersion.get() &&
beginVersion <= delta.clearVersion.get();
if (delta.values.empty()) {
return ParsedDeltaBoundaryRef(delta.key, clearAfter);
}
auto valueAtVersion = std::lower_bound(delta.values.begin(),
delta.values.end(),
ValueAndVersionRef(readVersion),
ValueAndVersionRef::OrderByVersion());
if (valueAtVersion == delta.values.begin() && readVersion < valueAtVersion->version) {
// deltas are all higher than read version
return ParsedDeltaBoundaryRef(delta.key, clearAfter);
}
// lower_bound() found version >= readVersion, so if we're at the end or it's not equal, go back one
if (valueAtVersion == delta.values.end() || valueAtVersion->version > readVersion) {
valueAtVersion--;
}
ASSERT(readVersion >= valueAtVersion->version);
// now, handle beginVersion (if update < beginVersion, it's a noop)
if (valueAtVersion->version < beginVersion) {
return ParsedDeltaBoundaryRef(delta.key, clearAfter);
} else {
return ParsedDeltaBoundaryRef(delta.key, clearAfter, *valueAtVersion);
}
}
void applyDeltasSorted(const Standalone<VectorRef<ParsedDeltaBoundaryRef>>& sortedDeltas,
bool startClear,
std::map<KeyRef, ValueRef>& dataMap) {
if (sortedDeltas.empty() && !startClear) {
return;
}
// sorted merge of 2 iterators
bool prevClear = startClear;
auto deltaIt = sortedDeltas.begin();
auto snapshotIt = dataMap.begin();
while (deltaIt != sortedDeltas.end() && snapshotIt != dataMap.end()) {
if (deltaIt->key < snapshotIt->first) {
// Delta is lower than snapshot. Insert new row, if the delta is a set. Ignore point clear and noop
if (deltaIt->isSet()) {
snapshotIt = dataMap.insert(snapshotIt, { deltaIt->key, deltaIt->value });
snapshotIt++;
}
prevClear = deltaIt->clearAfter;
deltaIt++;
} else if (snapshotIt->first < deltaIt->key) {
// Snapshot is lower than delta. Erase the current entry if the previous delta was a clearAfter
if (prevClear) {
snapshotIt = dataMap.erase(snapshotIt);
} else {
snapshotIt++;
}
} else {
// Delta and snapshot are for the same key. The delta is newer, so if it is a set, update the value, else if
// it's a clear, delete the value (ignore noop)
if (deltaIt->isSet()) {
snapshotIt->second = deltaIt->value;
} else if (deltaIt->isClear()) {
snapshotIt = dataMap.erase(snapshotIt);
}
if (!deltaIt->isClear()) {
snapshotIt++;
}
prevClear = deltaIt->clearAfter;
deltaIt++;
}
}
// Either we are out of deltas or out of snapshots.
// if snapshot remaining and prevClear last delta set, clear the rest of the map
if (prevClear && snapshotIt != dataMap.end()) {
dataMap.erase(snapshotIt, dataMap.end());
}
// Apply remaining sets from delta, with no remaining snapshot
while (deltaIt != sortedDeltas.end()) {
if (deltaIt->isSet()) {
snapshotIt = dataMap.insert(snapshotIt, { deltaIt->key, deltaIt->value });
}
deltaIt++;
}
}
// The arena owns the BoundaryDeltaRef struct data but the StringRef pointers point to data in deltaData, to avoid extra
// copying
Arena loadChunkedDeltaFile(const StringRef& deltaData,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
std::map<KeyRef, ValueRef>& dataMap,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
Standalone<VectorRef<ParsedDeltaBoundaryRef>> deltas;
Standalone<IndexedBlobGranuleFile> file = IndexedBlobGranuleFile::fromFileBytes(deltaData, cipherKeysCtx);
ASSERT(file.fileType == DELTA_FILE_TYPE);
ASSERT(file.chunkStartOffset > 0);
// empty delta file
if (file.indexBlockRef.block.children.empty()) {
return deltas.arena();
}
ASSERT(file.indexBlockRef.block.children.size() >= 2);
// TODO: refactor this out of delta tree
// int commonPrefixLen = commonPrefixLength(index.dataBlockOffsets.front().first,
// index.dataBlockOffsets.back().first);
// find range of blocks needed to read
ChildBlockPointerRef* currentBlock = file.findStartBlock(keyRange.begin);
// TODO cpu optimize (key check per block, prefixes, optimize start of first block)
bool startClear = false;
bool prevClearAfter = false;
while (currentBlock != (file.indexBlockRef.block.children.end() - 1) && keyRange.end > currentBlock->key) {
Standalone<GranuleSortedDeltas> deltaBlock =
file.getChild<GranuleSortedDeltas>(currentBlock, cipherKeysCtx, file.chunkStartOffset);
ASSERT(!deltaBlock.boundaries.empty());
ASSERT(currentBlock->key == deltaBlock.boundaries.front().key);
// TODO refactor this into function to share with memory deltas
bool blockMemoryUsed = false;
for (auto& entry : deltaBlock.boundaries) {
ParsedDeltaBoundaryRef boundary = deltaAtVersion(entry, beginVersion, readVersion);
if (entry.key < keyRange.begin) {
startClear = boundary.clearAfter;
prevClearAfter = boundary.clearAfter;
} else if (entry.key < keyRange.end) {
if (!boundary.redundant(prevClearAfter)) {
deltas.push_back(deltas.arena(), boundary);
blockMemoryUsed = true;
prevClearAfter = boundary.clearAfter;
}
} else {
break;
}
}
if (blockMemoryUsed) {
deltas.arena().dependsOn(deltaBlock.arena());
}
currentBlock++;
}
// TODO REMOVE
/*fmt::print("Parsing deltas [{0} - {1}) @ {2} - {3}\n",
keyRange.begin.printable(),
keyRange.end.printable(),
beginVersion,
readVersion);
fmt::print("Parsed Deltas ({0}):\n", deltas.size());
if (startClear) {
fmt::print(" StartClear+\n");
}
for (auto& it : deltas) {
fmt::print(" {0}) {1}", it.key.printable(), it.isSet() ? " =" : (it.isClear() ? " X" : ""));
if (it.clearAfter) {
fmt::print(" (Clear+)");
}
fmt::print("\n");
}*/
// TODO REMOVE
// order sanity check for parsed deltas
for (int i = 0; i < deltas.size() - 1; i++) {
ASSERT(deltas[i].key < deltas[i + 1].key);
}
applyDeltasSorted(deltas, startClear, dataMap);
return deltas.arena();
}
static void applyDelta(const KeyRangeRef& keyRange, const MutationRef& m, std::map<KeyRef, ValueRef>& dataMap) {
if (m.type == MutationRef::ClearRange) {
if (m.param2 <= keyRange.begin || m.param1 >= keyRange.end) {
@ -732,12 +1105,12 @@ static void applyDelta(const KeyRangeRef& keyRange, const MutationRef& m, std::m
}
}
static void applyDeltas(const GranuleDeltas& deltas,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
Version& lastFileEndVersion,
std::map<KeyRef, ValueRef>& dataMap) {
static void applyDeltasByVersion(const GranuleDeltas& deltas,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
Version& lastFileEndVersion,
std::map<KeyRef, ValueRef>& dataMap) {
if (deltas.empty()) {
return;
}
@ -772,34 +1145,6 @@ static void applyDeltas(const GranuleDeltas& deltas,
lastFileEndVersion = deltas.back().version;
}
static Arena loadDeltaFile(StringRef deltaData,
KeyRangeRef keyRange,
Version beginVersion,
Version readVersion,
Version& lastFileEndVersion,
std::map<KeyRef, ValueRef>& dataMap) {
Arena parseArena;
GranuleDeltas deltas;
ObjectReader reader(deltaData.begin(), Unversioned());
reader.deserialize(FileIdentifierFor<GranuleDeltas>::value, deltas, parseArena);
if (BG_READ_DEBUG) {
fmt::print("Parsed {} deltas from file\n", deltas.size());
}
// TODO REMOVE sanity check
for (int i = 0; i < deltas.size() - 1; i++) {
if (deltas[i].version > deltas[i + 1].version) {
fmt::print(
"BG VERSION ORDER VIOLATION IN DELTA FILE: '{0}', '{1}'\n", deltas[i].version, deltas[i + 1].version);
}
ASSERT(deltas[i].version <= deltas[i + 1].version);
}
applyDeltas(deltas, keyRange, beginVersion, readVersion, lastFileEndVersion, dataMap);
return parseArena;
}
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
KeyRangeRef keyRange,
Version beginVersion,
@ -833,14 +1178,15 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
fmt::print("Applying {} delta files\n", chunk.deltaFiles.size());
}
for (int deltaIdx = 0; deltaIdx < chunk.deltaFiles.size(); deltaIdx++) {
Arena deltaArena = loadDeltaFile(
deltaFileData[deltaIdx], requestRange, beginVersion, readVersion, lastFileEndVersion, dataMap);
Arena deltaArena = loadChunkedDeltaFile(
deltaFileData[deltaIdx], requestRange, beginVersion, readVersion, dataMap, chunk.cipherKeysCtx);
arena.dependsOn(deltaArena);
}
if (BG_READ_DEBUG) {
fmt::print("Applying {} memory deltas\n", chunk.newDeltas.size());
}
applyDeltas(chunk.newDeltas, requestRange, beginVersion, readVersion, lastFileEndVersion, dataMap);
// TODO: also sort these and do merge
applyDeltasByVersion(chunk.newDeltas, requestRange, beginVersion, readVersion, lastFileEndVersion, dataMap);
RangeResult ret;
for (auto& it : dataMap) {
@ -1154,6 +1500,85 @@ TEST_CASE("/blobgranule/files/applyDelta") {
return Void();
}
void checkDeltaAtVersion(const ParsedDeltaBoundaryRef& expected,
const DeltaBoundaryRef& boundary,
Version beginVersion,
Version readVersion) {
fmt::print("Checking {0} - {1}\n", beginVersion, readVersion);
ParsedDeltaBoundaryRef actual = deltaAtVersion(boundary, beginVersion, readVersion);
ASSERT(expected.clearAfter == actual.clearAfter);
if (expected.op != actual.op) {
fmt::print("Expected op {0} != actual {1}\n", expected.op, actual.op);
}
ASSERT(expected.op == actual.op);
if (expected.isSet()) {
ASSERT(expected.value == actual.value);
} else {
ASSERT(actual.value.empty());
}
}
TEST_CASE("/blobgranule/files/deltaAtVersion") {
Arena ar;
std::string keyStr = "k";
std::string aStr = "a";
KeyRef key(ar, keyStr);
ValueAndVersionRef vv_a_3(3, ValueRef(ar, aStr));
ValueAndVersionRef vv_clear_5(5);
ParsedDeltaBoundaryRef resultEmpty(key, false);
ParsedDeltaBoundaryRef resultEmptyWithClear(key, true);
ParsedDeltaBoundaryRef resultSetA(key, false, vv_a_3);
ParsedDeltaBoundaryRef resultClearA(key, true, vv_clear_5);
// test empty boundary ref
DeltaBoundaryRef boundaryEmpty;
boundaryEmpty.key = key;
checkDeltaAtVersion(resultEmpty, boundaryEmpty, 0, 2);
// test empty boundary with clear
DeltaBoundaryRef boundaryEmptyWithClear;
boundaryEmptyWithClear.key = key;
boundaryEmptyWithClear.clearVersion = 5;
// higher read version includes clear
checkDeltaAtVersion(resultEmptyWithClear, boundaryEmptyWithClear, 0, 5);
checkDeltaAtVersion(resultEmptyWithClear, boundaryEmptyWithClear, 0, 10);
checkDeltaAtVersion(resultEmptyWithClear, boundaryEmptyWithClear, 2, 5);
checkDeltaAtVersion(resultEmptyWithClear, boundaryEmptyWithClear, 2, 10);
checkDeltaAtVersion(resultEmptyWithClear, boundaryEmptyWithClear, 5, 10);
checkDeltaAtVersion(resultEmptyWithClear, boundaryEmptyWithClear, 5, 5);
// lower read version does not include clear
checkDeltaAtVersion(resultEmpty, boundaryEmptyWithClear, 0, 4);
checkDeltaAtVersion(resultEmpty, boundaryEmptyWithClear, 3, 4);
// higher read version but also higher beginVersion does not include clear
checkDeltaAtVersion(resultEmpty, boundaryEmptyWithClear, 6, 10);
// check values
DeltaBoundaryRef fullBoundary;
fullBoundary.key = key;
fullBoundary.values.push_back(ar, vv_a_3);
fullBoundary.values.push_back(ar, vv_clear_5);
fullBoundary.clearVersion = 5;
checkDeltaAtVersion(resultEmpty, fullBoundary, 0, 2);
checkDeltaAtVersion(resultEmpty, fullBoundary, 6, 10);
checkDeltaAtVersion(resultEmpty, fullBoundary, 4, 4);
checkDeltaAtVersion(resultSetA, fullBoundary, 0, 3);
checkDeltaAtVersion(resultSetA, fullBoundary, 3, 4);
checkDeltaAtVersion(resultClearA, fullBoundary, 0, 5);
checkDeltaAtVersion(resultClearA, fullBoundary, 0, 10);
checkDeltaAtVersion(resultClearA, fullBoundary, 3, 5);
checkDeltaAtVersion(resultClearA, fullBoundary, 4, 5);
return Void();
}
// picks a number between 2^minExp and 2^maxExp, but uniformly distributed over exponential buckets 2^n an 2^n+1
int randomExp(int minExp, int maxExp) {
if (minExp == maxExp) { // N=2, case
@ -1227,6 +1652,7 @@ struct KeyValueGen {
Version version = 0;
// encryption/compression settings
// TODO: possibly different cipher keys or meta context per file?
Optional<BlobGranuleCipherKeysCtx> cipherKeys;
Optional<CompressionFilter> compressFilter;
@ -1491,7 +1917,7 @@ void checkDeltaRead(const KeyValueGen& kvGen,
std::map<KeyRef, ValueRef> expectedData;
Version lastFileEndVersion = 0;
applyDeltas(data, range, beginVersion, readVersion, lastFileEndVersion, expectedData);
applyDeltasByVersion(data, range, beginVersion, readVersion, lastFileEndVersion, expectedData);
// actual answer
std::string filename = randomBGFilename(
@ -1547,6 +1973,7 @@ static std::tuple<KeyRange, Version, Version> randomizeKeyAndVersions(const KeyV
}
}
// TODO randomize begin and read version to sometimes +/- 1 and readRange begin and end to keyAfter sometimes
return { readRange, beginVersion, readVersion };
}
@ -1563,10 +1990,26 @@ Standalone<GranuleDeltas> genDeltas(KeyValueGen& kvGen, int targetBytes) {
TEST_CASE("/blobgranule/files/deltaFormatUnitTest") {
KeyValueGen kvGen;
int targetChunks = randomExp(0, 8);
int targetDataBytes = randomExp(0, 21);
int targetChunkSize = targetDataBytes / targetChunks;
Standalone<GranuleDeltas> data = genDeltas(kvGen, targetDataBytes);
Value serialized = serializeDeltaFile(data);
// TODO REMOVE
/*fmt::print("Deltas ({0})\n", data.size());
for (auto& it : data) {
fmt::print(" {0}) ({1})\n", it.version, it.mutations.size());
for (auto& it2 : it.mutations) {
if (it2.type == MutationRef::Type::SetValue) {
fmt::print(" {0}=\n", it2.param1.printable());
} else {
fmt::print(" {0} - {1}\n", it2.param1.printable(), it2.param2.printable());
}
}
}*/
Value serialized =
serializeChunkedDeltaFile(data, kvGen.allRange, targetChunkSize, kvGen.compressFilter, kvGen.cipherKeys);
// check whole file
checkDeltaRead(kvGen, kvGen.allRange, 0, data.back().version, data, &serialized);
@ -1597,7 +2040,7 @@ void checkGranuleRead(const KeyValueGen& kvGen,
}
}
Version lastFileEndVersion = 0;
applyDeltas(deltaData, range, beginVersion, readVersion, lastFileEndVersion, expectedData);
applyDeltasByVersion(deltaData, range, beginVersion, readVersion, lastFileEndVersion, expectedData);
// actual answer
Standalone<BlobGranuleChunkRef> chunk;
@ -1663,9 +2106,11 @@ TEST_CASE("/blobgranule/files/granuleReadUnitTest") {
KeyValueGen kvGen;
int targetSnapshotChunks = randomExp(0, 9);
int targetDeltaChunks = randomExp(0, 8);
int targetDataBytes = randomExp(12, 25);
int targetSnapshotBytes = (int)(deterministicRandom()->randomInt(0, targetDataBytes));
int targetDeltaBytes = targetDataBytes - targetSnapshotBytes;
int targetDeltaChunkSize = targetDeltaBytes / targetDeltaChunks;
Standalone<GranuleSnapshot> snapshotData = genSnapshot(kvGen, targetSnapshotBytes);
Standalone<GranuleDeltas> deltaData = genDeltas(kvGen, targetDeltaBytes);
@ -1684,7 +2129,9 @@ TEST_CASE("/blobgranule/files/granuleReadUnitTest") {
fileData.push_back_deep(fileData.arena(), deltaData[j]);
}
if (!fileData.empty()) {
Value serializedDelta = serializeDeltaFile(fileData);
// TODO: randomly make the last file just the in memory deltas sometimes
Value serializedDelta = serializeChunkedDeltaFile(
fileData, kvGen.allRange, targetDeltaChunkSize, kvGen.compressFilter, kvGen.cipherKeys);
serializedDeltaFiles.emplace_back(fileData.back().version, serializedDelta);
}
}

View File

@ -46,6 +46,7 @@ struct GranuleSnapshot : VectorRef<KeyValueRef> {
}
};
// Deltas in version order
struct GranuleDeltas : VectorRef<MutationsAndVersionRef> {
constexpr static FileIdentifier file_identifier = 8563013;
@ -55,6 +56,80 @@ struct GranuleDeltas : VectorRef<MutationsAndVersionRef> {
}
};
// Deltas in key order
// For key-ordered delta files, the format for both sets and range clears is that you store boundaries ordered by key.
// Each boundary has a corresponding key, zero or more versioned updates (ValueAndVersionRef), and optionally a clear
// from keyAfter(key) to the next boundary, at a version.
// In this form, one or more streams of delta boundaries can be merged with a snapshot to efficiently reconstruct the
// rows at a desired version.
// The concept of this versioned mutation boundaries is repurposed directly from a prior version of redwood, back when
// it supported versioned data.
struct ValueAndVersionRef {
Version version;
MutationRef::Type op; // only set/clear
ValueRef value; // only present for set
// clear constructor
ValueAndVersionRef() {}
explicit ValueAndVersionRef(Version version) : version(version), op(MutationRef::Type::ClearRange) {}
explicit ValueAndVersionRef(Version version, ValueRef value)
: version(version), op(MutationRef::Type::SetValue), value(value) {}
ValueAndVersionRef(Arena& arena, const ValueAndVersionRef& copyFrom)
: version(copyFrom.version), op(copyFrom.op), value(arena, copyFrom.value) {}
bool isSet() { return op == MutationRef::SetValue; }
bool isClear() { return op == MutationRef::ClearRange; }
int totalSize() const { return sizeof(ValueAndVersionRef) + value.size(); }
int expectedSize() const { return value.size(); }
struct OrderByVersion {
bool operator()(ValueAndVersionRef const& a, ValueAndVersionRef const& b) const {
return a.version < b.version;
}
};
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, op, value);
}
};
// TODO: might be better to hide these struct implementations in the cpp instead of header if they're only internal to
// the file format and not referenced elsewhere?
struct DeltaBoundaryRef {
// key
KeyRef key;
// updates to exactly this key
VectorRef<ValueAndVersionRef> values;
// clear version from keyAfter(key) up to the next boundary
Optional<Version> clearVersion;
DeltaBoundaryRef() {}
DeltaBoundaryRef(Arena& ar, const DeltaBoundaryRef& copyFrom)
: key(ar, copyFrom.key), values(ar, copyFrom.values), clearVersion(copyFrom.clearVersion) {}
int totalSize() { return sizeof(DeltaBoundaryRef) + key.expectedSize() + values.expectedSize(); }
int expectedSize() const { return key.expectedSize() + values.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, values, clearVersion);
}
};
struct GranuleSortedDeltas {
constexpr static FileIdentifier file_identifier = 8183903;
VectorRef<DeltaBoundaryRef> boundaries;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, boundaries);
}
};
struct BlobGranuleCipherKeysMeta {
EncryptCipherDomainId textDomainId;
EncryptCipherBaseKeyId textBaseCipherId;

View File

@ -29,11 +29,13 @@
Value serializeChunkedSnapshot(Standalone<GranuleSnapshot> snapshot,
int chunks,
Optional<CompressionFilter> compressFilter,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx = Optional<BlobGranuleCipherKeysCtx>());
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx = {});
Value serializeDeltaFile(Standalone<GranuleDeltas> deltas);
// FIXME: support sorted and chunked delta files
Value serializeChunkedDeltaFile(Standalone<GranuleDeltas> deltas,
const KeyRangeRef& fileRange,
int chunkSize,
Optional<CompressionFilter> compressFilter,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx = {});
ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<BlobGranuleChunkRef>>& files,
const KeyRangeRef& keyRange,

View File

@ -602,7 +602,21 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
state std::string fileName = randomBGFilename(bwData->id, granuleID, currentDeltaVersion, ".delta");
state Value serialized = serializeDeltaFile(deltasToWrite);
state Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx;
state Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
state Arena arena;
// TODO support encryption, figure out proper state stuff
/*if (isBlobFileEncryptionSupported()) {
BlobGranuleCipherKeysCtx ciphKeysCtx = wait(getLatestGranuleCipherKeys(bwData, keyRange, &arena));
cipherKeysCtx = ciphKeysCtx;
cipherKeysMeta = BlobGranuleCipherKeysCtx::toCipherKeysMeta(cipherKeysCtx.get());
}*/
Optional<CompressionFilter> compressFilter = getBlobFileCompressFilter();
// TODO KNOB
state Value serialized =
serializeChunkedDeltaFile(deltasToWrite, keyRange, 16 * 1024, compressFilter, cipherKeysCtx);
state size_t serializedSize = serialized.size();
// Free up deltasToWrite here to reduce memory
@ -640,7 +654,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
Key dfKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D');
// TODO change once we support file multiplexing
Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize);
Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, cipherKeysMeta);
tr->set(dfKey, dfValue);
if (oldGranuleComplete.present()) {
@ -668,7 +682,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
wait(delay(deterministicRandom()->random01()));
}
// FIXME: change when we implement multiplexing
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize);
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize, cipherKeysMeta);
} catch (Error& e) {
wait(tr->onError(e));
}
@ -970,6 +984,7 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
snapshotF.cipherKeysMeta);
// TODO: optimization - batch 'encryption-key' lookup given the GranuleFile set is known
// FIXME: get cipher keys for delta as well!
if (chunk.snapshotFile.get().cipherKeysMetaRef.present()) {
ASSERT(isBlobFileEncryptionSupported());
BlobGranuleCipherKeysCtx cipherKeysCtx =
@ -3187,6 +3202,8 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
getGranuleCipherKeys(bwData, chunk.snapshotFile.get().cipherKeysMetaRef.get(), &rep.arena);
}
// FIXME: get cipher keys for delta files too!
// new deltas (if version is larger than version of last delta file)
// FIXME: do trivial key bounds here if key range is not fully contained in request key
// range

View File

@ -7,4 +7,4 @@ startDelay = 0
[[test.workload]]
testName = 'UnitTests'
maxTestCases = 1
testsMatching = '/'
testsMatching = '/blobgranule/files/deltaFormatUnitTest'