diff --git a/fdbclient/BlobGranuleFiles.cpp b/fdbclient/BlobGranuleFiles.cpp index 4f4c7f1809..d18c745ce4 100644 --- a/fdbclient/BlobGranuleFiles.cpp +++ b/fdbclient/BlobGranuleFiles.cpp @@ -39,9 +39,11 @@ #include "fmt/format.h" #include +#include // for perf microbenchmark #include #define BG_READ_DEBUG false +#define BG_FILES_TEST_DEBUG false // Implements granule file parsing and materialization with normal c++ functions (non-actors) so that this can be used // outside the FDB network thread. @@ -57,6 +59,21 @@ uint16_t MIN_SUPPORTED_BG_FORMAT_VERSION = 1; const uint8_t SNAPSHOT_FILE_TYPE = 'S'; const uint8_t DELTA_FILE_TYPE = 'D'; +static int getDefaultCompressionLevel(CompressionFilter filter) { + if (filter == CompressionFilter::NONE) { + return -1; +#ifdef ZLIB_LIB_SUPPORTED + } else if (filter == CompressionFilter::GZIP) { + // opt for high speed compression, larger levels have a high cpu cost and not much compression ratio + // improvement, according to benchmarks + return 1; +#endif + } else { + ASSERT(false); + return -1; + } +} + // Deltas in key order // For key-ordered delta files, the format for both sets and range clears is that you store boundaries ordered by key. @@ -97,6 +114,39 @@ struct ValueAndVersionRef { } }; +// Effectively the single DeltaBoundaryRef reduced to one update, but also with the key and clear after information. +// Sometimes at a given version, the boundary may only be necessary to represent a clear version after this key, or just +// an update/clear to this key, or both. +struct ParsedDeltaBoundaryRef { + KeyRef key; + MutationRef::Type op; // SetValue, ClearRange, or NoOp + ValueRef value; // null unless op == SetValue + bool clearAfter; + + // op constructor + ParsedDeltaBoundaryRef() {} + explicit ParsedDeltaBoundaryRef(KeyRef key, bool clearAfter, const ValueAndVersionRef& valueAndVersion) + : key(key), op(valueAndVersion.op), value(valueAndVersion.value), clearAfter(clearAfter) {} + // noop constructor + explicit ParsedDeltaBoundaryRef(KeyRef key, bool clearAfter) + : key(key), op(MutationRef::Type::NoOp), clearAfter(clearAfter) {} + // from snapshot set constructor + explicit ParsedDeltaBoundaryRef(const KeyValueRef& kv) + : key(kv.key), op(MutationRef::Type::SetValue), value(kv.value), clearAfter(false) {} + + ParsedDeltaBoundaryRef(Arena& arena, const ParsedDeltaBoundaryRef& copyFrom) + : key(arena, copyFrom.key), op(copyFrom.op), clearAfter(copyFrom.clearAfter) { + if (copyFrom.isSet()) { + value = StringRef(arena, copyFrom.value); + } + } + + bool isSet() const { return op == MutationRef::SetValue; } + bool isClear() const { return op == MutationRef::ClearRange; } + bool isNoOp() const { return op == MutationRef::NoOp; } + bool redundant(bool prevClearAfter) const { return op == MutationRef::Type::NoOp && clearAfter == prevClearAfter; } +}; + struct DeltaBoundaryRef { // key KeyRef key; @@ -285,7 +335,6 @@ struct IndexBlockRef { TraceEvent(SevDebug, "IndexBlockEncrypt_After").detail("Chksum", chksum); } - // TODO: Add version? ObjectReader dataReader(decrypted.begin(), IncludeVersion()); dataReader.deserialize(FileIdentifierFor::value, idxRef.block, arena); } @@ -299,7 +348,6 @@ struct IndexBlockRef { } else { TraceEvent("IndexBlockSize").detail("Sz", buffer.size()); - // TODO: Add version? ObjectReader dataReader(buffer.begin(), IncludeVersion()); dataReader.deserialize(FileIdentifierFor::value, block, arena); } @@ -411,7 +459,8 @@ struct IndexBlobGranuleFileChunkRef { const CompressionFilter compFilter, Arena& arena) { chunkRef.compressionFilter = compFilter; - chunkRef.buffer = CompressionUtils::compress(chunkRef.compressionFilter.get(), chunk.contents(), arena); + chunkRef.buffer = CompressionUtils::compress( + chunkRef.compressionFilter.get(), chunk.contents(), getDefaultCompressionLevel(compFilter), arena); if (BG_ENCRYPT_COMPRESS_DEBUG) { XXH64_hash_t chunkChksum = XXH3_64bits(chunk.contents().begin(), chunk.contents().size()); @@ -659,7 +708,7 @@ Value serializeFileFromChunks(Standalone& file, // TODO: optimize memory copying // TODO: sanity check no oversized files Value serializeChunkedSnapshot(const Standalone& fileNameRef, - Standalone snapshot, + const Standalone& snapshot, int targetChunkBytes, Optional compressFilter, Optional cipherKeysCtx) { @@ -728,12 +777,12 @@ Value serializeChunkedSnapshot(const Standalone& fileNameRef, } // TODO: use redwood prefix trick to optimize cpu comparison -static Arena loadSnapshotFile(const Standalone& fileName, - const StringRef& snapshotData, - const KeyRangeRef& keyRange, - std::map& dataMap, - Optional cipherKeysCtx) { - Arena rootArena; +static Standalone> loadSnapshotFile( + const Standalone& fileName, + const StringRef& snapshotData, + const KeyRangeRef& keyRange, + Optional cipherKeysCtx) { + Standalone> results; if (BG_ENCRYPT_COMPRESS_DEBUG) { TraceEvent(SevDebug, "LoadChunkedSnapshot") @@ -750,7 +799,7 @@ static Arena loadSnapshotFile(const Standalone& fileName, // empty snapshot file if (file.indexBlockRef.block.children.empty()) { - return rootArena; + return results; } ASSERT(file.indexBlockRef.block.children.size() >= 2); @@ -762,9 +811,17 @@ static Arena loadSnapshotFile(const Standalone& fileName, // find range of blocks needed to read ChildBlockPointerRef* currentBlock = file.findStartBlock(keyRange.begin); - // FIXME: optimize cpu comparisons here in first/last partial blocks, doing entire blocks at once based on - // comparison, and using shared prefix for key comparison - while (currentBlock != (file.indexBlockRef.block.children.end() - 1) && keyRange.end > currentBlock->key) { + if (currentBlock == (file.indexBlockRef.block.children.end() - 1) || keyRange.end <= currentBlock->key) { + return results; + } + + bool lastBlock = false; + + // FIXME: shared prefix for key comparison + while (!lastBlock) { + auto nextBlock = currentBlock; + nextBlock++; + lastBlock = (nextBlock == (file.indexBlockRef.block.children.end() - 1)) || (keyRange.end <= nextBlock->key); Standalone dataBlock = file.getChild(currentBlock, cipherKeysCtx, file.chunkStartOffset); ASSERT(!dataBlock.empty()); @@ -772,18 +829,24 @@ static Arena loadSnapshotFile(const Standalone& fileName, bool anyRows = false; for (auto& entry : dataBlock) { - if (entry.key >= keyRange.begin && entry.key < keyRange.end) { - dataMap.insert({ entry.key, entry.value }); + if (!results.empty() && !lastBlock) { + // no key comparisons needed + results.emplace_back(results.arena(), entry); anyRows = true; + } else if ((!results.empty() || entry.key >= keyRange.begin) && (!lastBlock || entry.key < keyRange.end)) { + results.emplace_back(results.arena(), entry); + anyRows = true; + } else if (!results.empty() && lastBlock) { + break; } } if (anyRows) { - rootArena.dependsOn(dataBlock.arena()); + results.arena().dependsOn(dataBlock.arena()); } currentBlock++; } - return rootArena; + return results; } typedef std::map> SortedDeltasT; @@ -847,6 +910,41 @@ void updateMutationBoundary(Standalone& boundary, const ValueA } } +void insertSortedDelta(const MutationRef& m, + const Version version, + const KeyRangeRef& fileRange, + SortedDeltasT& deltasByKey) { + // TODO REMOVE validation + ASSERT(fileRange.contains(m.param1)); + if (m.type == MutationRef::ClearRange) { + ASSERT(m.param2 <= fileRange.end); + // handle single key clear more efficiently + if (equalsKeyAfter(m.param1, m.param2)) { + SortedDeltasT::iterator key = insertMutationBoundary(deltasByKey, m.param1); + updateMutationBoundary(key->second, ValueAndVersionRef(version)); + } else { + // Update each boundary in the cleared range + SortedDeltasT::iterator begin = insertMutationBoundary(deltasByKey, m.param1); + SortedDeltasT::iterator end = insertMutationBoundary(deltasByKey, m.param2); + while (begin != end) { + // Set the rangeClearedVersion if not set + if (!begin->second.clearVersion.present()) { + begin->second.clearVersion = version; + } + + // Add a clear to values if it's empty or the last item is not a clear + if (begin->second.values.empty() || begin->second.values.back().isSet()) { + updateMutationBoundary(begin->second, ValueAndVersionRef(version)); + } + ++begin; + } + } + } else { + Standalone& bound = insertMutationBoundary(deltasByKey, m.param1)->second; + updateMutationBoundary(bound, ValueAndVersionRef(version, m.param2)); + } +} + // TODO: investigate more cpu-efficient sorting methods. Potential options: // 1) Replace std::map with ART mutation buffer // 2) sort updates and clear endpoints by (key, version), and keep track of active clears. @@ -862,35 +960,7 @@ void sortDeltasByKey(const Standalone& deltasByVersion, } for (auto& it : deltasByVersion) { for (auto& m : it.mutations) { - // TODO REMOVE validation - ASSERT(fileRange.contains(m.param1)); - if (m.type == MutationRef::ClearRange) { - ASSERT(m.param2 <= fileRange.end); - // handle single key clear more efficiently - if (equalsKeyAfter(m.param1, m.param2)) { - SortedDeltasT::iterator key = insertMutationBoundary(deltasByKey, m.param1); - updateMutationBoundary(key->second, ValueAndVersionRef(it.version)); - } else { - // Update each boundary in the cleared range - SortedDeltasT::iterator begin = insertMutationBoundary(deltasByKey, m.param1); - SortedDeltasT::iterator end = insertMutationBoundary(deltasByKey, m.param2); - while (begin != end) { - // Set the rangeClearedVersion if not set - if (!begin->second.clearVersion.present()) { - begin->second.clearVersion = it.version; - } - - // Add a clear to values if it's empty or the last item is not a clear - if (begin->second.values.empty() || begin->second.values.back().isSet()) { - updateMutationBoundary(begin->second, ValueAndVersionRef(it.version)); - } - ++begin; - } - } - } else { - Standalone& bound = insertMutationBoundary(deltasByKey, m.param1)->second; - updateMutationBoundary(bound, ValueAndVersionRef(it.version, m.param2)); - } + insertSortedDelta(m, it.version, fileRange, deltasByKey); } } @@ -900,7 +970,7 @@ void sortDeltasByKey(const Standalone& deltasByVersion, // FIXME: Could maybe reduce duplicated code between this and chunkedSnapshot for chunking Value serializeChunkedDeltaFile(const Standalone& fileNameRef, - Standalone deltas, + const Standalone& deltas, const KeyRangeRef& fileRange, int chunkSize, Optional compressFilter, @@ -979,35 +1049,6 @@ Value serializeChunkedDeltaFile(const Standalone& fileNameRef, return serializeFileFromChunks(file, cipherKeysCtx, chunks, previousChunkBytes); } -// Effectively the single DeltaBoundaryRef reduced to one update, but also with the key and clear after information. -// Sometimes at a given version, the boundary may only be necessary to represent a clear version after this key, or just -// an update/clear to this key, or both. -struct ParsedDeltaBoundaryRef { - KeyRef key; - MutationRef::Type op; // SetValue, ClearRange, or NoOp - ValueRef value; // null unless op == SetValue - bool clearAfter; - - // op constructor - ParsedDeltaBoundaryRef() {} - explicit ParsedDeltaBoundaryRef(KeyRef key, bool clearAfter, const ValueAndVersionRef& valueAndVersion) - : key(key), op(valueAndVersion.op), value(valueAndVersion.value), clearAfter(clearAfter) {} - // noop constructor - explicit ParsedDeltaBoundaryRef(KeyRef key, bool clearAfter) - : key(key), op(MutationRef::Type::NoOp), clearAfter(clearAfter) {} - ParsedDeltaBoundaryRef(Arena& arena, const ParsedDeltaBoundaryRef& copyFrom) - : key(arena, copyFrom.key), op(copyFrom.op), clearAfter(copyFrom.clearAfter) { - if (copyFrom.isSet()) { - value = StringRef(arena, copyFrom.value); - } - } - - bool isSet() const { return op == MutationRef::SetValue; } - bool isClear() const { return op == MutationRef::ClearRange; } - bool redundant(bool prevClearAfter) const { return op == MutationRef::Type::NoOp && clearAfter == prevClearAfter; } -}; - -// TODO could move ParsedDeltaBoundaryRef struct type up to granule common and make this a member of DeltaBoundaryRef? ParsedDeltaBoundaryRef deltaAtVersion(const DeltaBoundaryRef& delta, Version beginVersion, Version readVersion) { bool clearAfter = delta.clearVersion.present() && readVersion >= delta.clearVersion.get() && beginVersion <= delta.clearVersion.get(); @@ -1096,22 +1137,13 @@ void applyDeltasSorted(const Standalone>& sort // The arena owns the BoundaryDeltaRef struct data but the StringRef pointers point to data in deltaData, to avoid extra // copying -Arena loadChunkedDeltaFile(const Standalone& fileNameRef, - const StringRef& deltaData, - const KeyRangeRef& keyRange, - Version beginVersion, - Version readVersion, - std::map& dataMap, - Optional cipherKeysCtx) { - - if (BG_ENCRYPT_COMPRESS_DEBUG) { - TraceEvent(SevDebug, "LoadChunkedDelta") - .detail("FileName", fileNameRef.toString()) - .detail("RangeBegin", keyRange.begin.printable()) - .detail("RangeEnd", keyRange.end.printable()) - .detail("Encrypted", cipherKeysCtx.present()); - } - +Standalone> loadChunkedDeltaFile(const Standalone& fileNameRef, + const StringRef& deltaData, + const KeyRangeRef& keyRange, + Version beginVersion, + Version readVersion, + Optional cipherKeysCtx, + bool& startClear) { Standalone> deltas; Standalone file = IndexedBlobGranuleFile::fromFileBytes(deltaData, cipherKeysCtx); @@ -1120,7 +1152,7 @@ Arena loadChunkedDeltaFile(const Standalone& fileNameRef, // empty delta file if (file.indexBlockRef.block.children.empty()) { - return deltas.arena(); + return deltas; } ASSERT(file.indexBlockRef.block.children.size() >= 2); @@ -1132,10 +1164,19 @@ Arena loadChunkedDeltaFile(const Standalone& fileNameRef, // find range of blocks needed to read ChildBlockPointerRef* currentBlock = file.findStartBlock(keyRange.begin); - // TODO cpu optimize (key check per block, prefixes, optimize start of first block) - bool startClear = false; + if (currentBlock == (file.indexBlockRef.block.children.end() - 1) || keyRange.end <= currentBlock->key) { + // empty, done + return deltas; + } + + // TODO: could cpu optimize first block a bit more by seeking right to start + bool lastBlock = false; bool prevClearAfter = false; - while (currentBlock != (file.indexBlockRef.block.children.end() - 1) && keyRange.end > currentBlock->key) { + while (!lastBlock) { + auto nextBlock = currentBlock; + nextBlock++; + lastBlock = (nextBlock == file.indexBlockRef.block.children.end() - 1) || keyRange.end <= nextBlock->key; + Standalone deltaBlock = file.getChild(currentBlock, cipherKeysCtx, file.chunkStartOffset); ASSERT(!deltaBlock.boundaries.empty()); @@ -1146,16 +1187,18 @@ Arena loadChunkedDeltaFile(const Standalone& fileNameRef, for (auto& entry : deltaBlock.boundaries) { ParsedDeltaBoundaryRef boundary = deltaAtVersion(entry, beginVersion, readVersion); - if (entry.key < keyRange.begin) { + if (deltas.empty() && entry.key < keyRange.begin) { startClear = boundary.clearAfter; prevClearAfter = boundary.clearAfter; - } else if (entry.key < keyRange.end) { + } else if (!lastBlock || entry.key < keyRange.end) { if (!boundary.redundant(prevClearAfter)) { deltas.push_back(deltas.arena(), boundary); blockMemoryUsed = true; prevClearAfter = boundary.clearAfter; } } else { + // TODO REMOVE validation + ASSERT(lastBlock); break; } } @@ -1170,9 +1213,7 @@ Arena loadChunkedDeltaFile(const Standalone& fileNameRef, ASSERT(deltas[i].key < deltas[i + 1].key); } - applyDeltasSorted(deltas, startClear, dataMap); - - return deltas.arena(); + return deltas; } static void applyDelta(const KeyRangeRef& keyRange, const MutationRef& m, std::map& dataMap) { @@ -1252,6 +1293,188 @@ static void applyDeltasByVersion(const GranuleDeltas& deltas, lastFileEndVersion = deltas.back().version; } +// TODO: could optimize this slightly to avoid tracking multiple updates for the same key at all since it's always then +// collapsed to the last one +Standalone> sortMemoryDeltas(const GranuleDeltas& memoryDeltas, + const KeyRangeRef& granuleRange, + const KeyRangeRef& readRange, + Version beginVersion, + Version readVersion) { + ASSERT(!memoryDeltas.empty()); + + // filter by request range first + SortedDeltasT versionedBoundaries; + if (versionedBoundaries.empty()) { + versionedBoundaries.insert({ readRange.begin, Standalone() }); + versionedBoundaries.insert({ readRange.end, Standalone() }); + } + for (auto& it : memoryDeltas) { + for (auto& m : it.mutations) { + if (m.type == MutationRef::ClearRange) { + if (m.param2 > readRange.begin && m.param1 < readRange.end) { + KeyRangeRef clearRangeClipped = readRange & KeyRangeRef(m.param1, m.param2); + MutationRef clearClipped( + MutationRef::Type::ClearRange, clearRangeClipped.begin, clearRangeClipped.end); + insertSortedDelta(clearClipped, it.version, granuleRange, versionedBoundaries); + } + } else { + ASSERT(m.type == MutationRef::SetValue); + if (readRange.contains(m.param1)) { + insertSortedDelta(m, it.version, granuleRange, versionedBoundaries); + } + } + } + } + + // parse and collapse based on version + bool prevClearAfter = false; + Standalone> deltas; + + // remove extra ranges inserted from clears that partially overlap read range + auto itBegin = versionedBoundaries.begin(); + while (itBegin->first < readRange.begin) { + ++itBegin; + } + auto itEnd = versionedBoundaries.end(); + itEnd--; + while (itEnd->first > readRange.end) { + itEnd--; + } + itEnd++; + + while (itBegin != itEnd) { + itBegin->second.key = itBegin->first; + ParsedDeltaBoundaryRef boundary = deltaAtVersion(itBegin->second, beginVersion, readVersion); + if (!boundary.redundant(prevClearAfter)) { + deltas.push_back_deep(deltas.arena(), boundary); + prevClearAfter = boundary.clearAfter; + } + ++itBegin; + } + + return deltas; +} + +// does a sorted merge of the delta streams. +// In terms of write precedence, streams[i] < streams[i+1] +// Handles range clears by tracking the active clears when they start +struct MergeStreamNext { + KeyRef key; + int16_t streamIdx; + int dataIdx; +}; + +// the sort order is logically lower by key, and then higher by streamIdx +// because a priority queue is backwards, we invert that +struct OrderForPriorityQueue { + int commonPrefixLen; + OrderForPriorityQueue(int commonPrefixLen) : commonPrefixLen(commonPrefixLen) {} + + bool operator()(MergeStreamNext const& a, MergeStreamNext const& b) const { + int keyCmp = a.key.compareSuffix(b.key, commonPrefixLen); + if (keyCmp != 0) { + return keyCmp > 0; // reverse + } + return a.streamIdx < b.streamIdx; + } +}; + +typedef std::priority_queue, OrderForPriorityQueue> MergePQ; + +static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk, + const std::vector>>& streams, + const std::vector startClears) { + ASSERT(streams.size() < std::numeric_limits::max()); + ASSERT(startClears.size() == streams.size()); + + int prefixLen = commonPrefixLength(chunk.keyRange.begin, chunk.keyRange.end); + + // next element for each stream + MergePQ next = MergePQ(OrderForPriorityQueue(prefixLen)); + + // efficiently find the highest stream's active clear + std::set> activeClears; + int16_t maxActiveClear = -1; + + // check if a given stream is actively clearing + bool clearActive[streams.size()]; + for (int16_t i = 0; i < streams.size(); i++) { + clearActive[i] = startClears[i]; + if (startClears[i]) { + activeClears.insert(i); + maxActiveClear = i; + } + if (streams[i].empty()) { + // single clear that entirely encases partial read bounds + ASSERT(clearActive[i]); + } else { + MergeStreamNext item; + item.key = streams[i][0].key; + item.streamIdx = i; + item.dataIdx = 0; + next.push(item); + } + } + + RangeResult result; + std::vector cur; + cur.reserve(streams.size()); + while (!next.empty()) { + cur.clear(); + cur.push_back(next.top()); + next.pop(); + + // next.top().key == cur.front().key but with suffix comparison + while (!next.empty() && cur.front().key.compareSuffix(next.top().key, prefixLen) == 0) { + cur.push_back(next.top()); + next.pop(); + } + + // un-set clears and find latest value for key (if present) + bool foundValue = false; + for (auto& it : cur) { + auto& v = streams[it.streamIdx][it.dataIdx]; + if (clearActive[it.streamIdx]) { + clearActive[it.streamIdx] = false; + activeClears.erase(it.streamIdx); + if (it.streamIdx == maxActiveClear) { + // re-get max active clear + maxActiveClear = activeClears.empty() ? -1 : *activeClears.begin(); + } + } + + // find value for this key (if any) + if (!foundValue && !v.isNoOp()) { + foundValue = true; + // if it's a clear, or maxActiveClear is higher, no value for this key + if (v.isSet() && maxActiveClear < it.streamIdx) { + KeyRef finalKey = + chunk.tenantPrefix.present() ? v.key.removePrefix(chunk.tenantPrefix.get()) : v.key; + result.push_back_deep(result.arena(), KeyValueRef(finalKey, v.value)); + } + } + } + + // advance streams and start clearAfter + for (auto& it : cur) { + if (streams[it.streamIdx][it.dataIdx].clearAfter) { + clearActive[it.streamIdx] = true; + activeClears.insert(it.streamIdx); + maxActiveClear = std::max(maxActiveClear, it.streamIdx); + } + // TODO: implement skipping if large clear!! + // if (maxClearIdx > it.streamIdx) - skip + it.dataIdx++; + if (it.dataIdx < streams[it.streamIdx].size()) { + it.key = streams[it.streamIdx][it.dataIdx].key; + next.push(it); + } + } + } + + return result; +} + RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, KeyRangeRef keyRange, Version beginVersion, @@ -1267,8 +1490,6 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, // FIXME: probably some threshold of a small percentage of the data is actually changed, where it makes sense to // just to dependsOn instead of copy, to use a little extra memory footprint to help cpu? Arena arena; - std::map dataMap; - Version lastFileEndVersion = invalidVersion; KeyRange requestRange; if (chunk.tenantPrefix.present()) { requestRange = keyRange.withPrefix(chunk.tenantPrefix.get()); @@ -1276,45 +1497,60 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, requestRange = keyRange; } + std::vector>> streams; + std::vector startClears; + // +1 for possible snapshot, +1 for possible memory deltas + streams.reserve(chunk.deltaFiles.size() + 2); + if (snapshotData.present()) { ASSERT(chunk.snapshotFile.present()); - - Arena snapshotArena = loadSnapshotFile(chunk.snapshotFile.get().filename, - snapshotData.get(), - requestRange, - dataMap, - chunk.snapshotFile.get().cipherKeysCtx); - arena.dependsOn(snapshotArena); + Standalone> snapshotRows = + loadSnapshotFile(chunk.snapshotFile.get().filename, + snapshotData.get(), + requestRange, + chunk.snapshotFile.get().cipherKeysCtx); + if (!snapshotRows.empty()) { + streams.push_back(snapshotRows); + startClears.push_back(false); + arena.dependsOn(streams.back().arena()); + } } if (BG_READ_DEBUG) { fmt::print("Applying {} delta files\n", chunk.deltaFiles.size()); } for (int deltaIdx = 0; deltaIdx < chunk.deltaFiles.size(); deltaIdx++) { - Arena deltaArena = loadChunkedDeltaFile(chunk.deltaFiles[deltaIdx].filename, - deltaFileData[deltaIdx], - requestRange, - beginVersion, - readVersion, - dataMap, - chunk.deltaFiles[deltaIdx].cipherKeysCtx); - arena.dependsOn(deltaArena); + bool startClear = false; + auto deltaRows = loadChunkedDeltaFile(chunk.deltaFiles[deltaIdx].filename, + deltaFileData[deltaIdx], + requestRange, + beginVersion, + readVersion, + chunk.deltaFiles[deltaIdx].cipherKeysCtx, + startClear); + if (startClear || !deltaRows.empty()) { + streams.push_back(deltaRows); + startClears.push_back(startClear); + arena.dependsOn(streams.back().arena()); + } + arena.dependsOn(deltaRows.arena()); } if (BG_READ_DEBUG) { fmt::print("Applying {} memory deltas\n", chunk.newDeltas.size()); } - // TODO: also sort these and do merge - applyDeltasByVersion(chunk.newDeltas, requestRange, beginVersion, readVersion, lastFileEndVersion, dataMap); - - RangeResult ret; - for (auto& it : dataMap) { - ret.push_back_deep( - ret.arena(), - KeyValueRef(chunk.tenantPrefix.present() ? it.first.removePrefix(chunk.tenantPrefix.get()) : it.first, - it.second)); + if (!chunk.newDeltas.empty()) { + // TODO REMOVE validation + ASSERT(beginVersion <= chunk.newDeltas.front().version); + ASSERT(readVersion >= chunk.newDeltas.back().version); + auto memoryRows = sortMemoryDeltas(chunk.newDeltas, chunk.keyRange, requestRange, beginVersion, readVersion); + if (!memoryRows.empty()) { + streams.push_back(memoryRows); + startClears.push_back(false); + arena.dependsOn(streams.back().arena()); + } } - return ret; + return mergeDeltaStreams(chunk, streams, startClears); } struct GranuleLoadIds { @@ -1694,9 +1930,9 @@ TEST_CASE("/blobgranule/files/deltaAtVersion") { } void checkSnapshotEmpty(const Value& serialized, Key begin, Key end, Optional cipherKeysCtx) { - std::map result; Standalone fileNameRef = StringRef(); - Arena ar = loadSnapshotFile(fileNameRef, serialized, KeyRangeRef(begin, end), result, cipherKeysCtx); + Standalone> result = + loadSnapshotFile(fileNameRef, serialized, KeyRangeRef(begin, end), cipherKeysCtx); ASSERT(result.empty()); } @@ -1709,29 +1945,44 @@ void checkSnapshotRead(const Standalone& fileNameRef, Optional cipherKeysCtx) { ASSERT(beginIdx < endIdx); ASSERT(endIdx <= snapshot.size()); - std::map result; KeyRef beginKey = snapshot[beginIdx].key; Key endKey = endIdx == snapshot.size() ? keyAfter(snapshot.back().key) : snapshot[endIdx].key; KeyRangeRef range(beginKey, endKey); - Arena ar = loadSnapshotFile(fileNameRef, serialized, range, result, cipherKeysCtx); + fmt::print("Reading [{0} - {1})\n", beginKey.printable(), endKey.printable()); + + Standalone> result = + loadSnapshotFile(fileNameRef, serialized, range, cipherKeysCtx); if (result.size() != endIdx - beginIdx) { fmt::print("Read {0} rows != {1}\n", result.size(), endIdx - beginIdx); } + + if (BG_FILES_TEST_DEBUG) { + fmt::print("Expected Data {0}:\n", result.size()); + for (auto& it : result) { + fmt::print(" {0}=\n", it.key.printable()); + } + fmt::print("Actual Data {0}:\n", endIdx - beginIdx); + for (int i = beginIdx; i < endIdx; i++) { + fmt::print(" {0}=\n", snapshot[i].key.printable()); + } + } + ASSERT(result.size() == endIdx - beginIdx); for (auto& it : result) { - if (it.first != snapshot[beginIdx].key) { - fmt::print("Key {0} != {1}\n", it.first.printable(), snapshot[beginIdx].key.printable()); + ASSERT(it.isSet()); + if (it.key != snapshot[beginIdx].key) { + fmt::print("Key {0} != {1}\n", it.key.printable(), snapshot[beginIdx].key.printable()); } - ASSERT(it.first == snapshot[beginIdx].key); - if (it.first != snapshot[beginIdx].key) { + ASSERT(it.key == snapshot[beginIdx].key); + if (it.key != snapshot[beginIdx].key) { fmt::print("Value {0} != {1} for Key {2}\n", - it.second.printable(), + it.value.printable(), snapshot[beginIdx].value.printable(), - it.first.printable()); + it.key.printable()); } - ASSERT(it.second == snapshot[beginIdx].value); + ASSERT(it.value == snapshot[beginIdx].value); beginIdx++; } } @@ -1769,7 +2020,8 @@ struct KeyValueGen { targetKeyLength = deterministicRandom()->randomInt(4, uidSize); sharedPrefix = sharedPrefix.substr(0, sharedPrefixLen) + "_"; targetValueLength = deterministicRandom()->randomExp(0, 12); - allRange = KeyRangeRef(StringRef(sharedPrefix), LiteralStringRef("\xff")); + allRange = KeyRangeRef(StringRef(sharedPrefix), + sharedPrefix.size() == 0 ? LiteralStringRef("\xff") : strinc(StringRef(sharedPrefix))); if (deterministicRandom()->coinflip()) { clearFrequency = 0.0; @@ -1957,6 +2209,82 @@ Standalone genSnapshot(KeyValueGen& kvGen, int targetDataBytes) return data; } +Standalone genDeltas(KeyValueGen& kvGen, int targetBytes) { + Standalone data; + int totalDataBytes = 0; + while (totalDataBytes < targetBytes) { + data.push_back(data.arena(), kvGen.newDelta()); + totalDataBytes += data.back().expectedSize(); + } + return data; +} + +TEST_CASE("/blobgranule/files/validateEncryptionCompression") { + KeyValueGen kvGen; + + int targetSnapshotChunks = deterministicRandom()->randomExp(0, 9); + int targetDeltaChunks = deterministicRandom()->randomExp(0, 8); + int targetDataBytes = deterministicRandom()->randomExp(12, 25); + int targetSnapshotBytes = (int)(deterministicRandom()->randomInt(0, targetDataBytes)); + int targetDeltaBytes = targetDataBytes - targetSnapshotBytes; + + int targetSnapshotChunkSize = targetSnapshotBytes / targetSnapshotChunks; + int targetDeltaChunkSize = targetDeltaBytes / targetDeltaChunks; + + Standalone snapshotData = genSnapshot(kvGen, targetSnapshotBytes); + Standalone deltaData = genDeltas(kvGen, targetDeltaBytes); + fmt::print("{0} snapshot rows and {1} deltas\n", snapshotData.size(), deltaData.size()); + + Standalone fileNameRef = StringRef(); + + Arena ar; + BlobGranuleCipherKeysCtx cipherKeys = getCipherKeysCtx(ar); + std::vector encryptionModes = { false, true }; + std::vector> compressionModes; + compressionModes.push_back({}); +#ifdef ZLIB_LIB_SUPPORTED + compressionModes.push_back(CompressionFilter::GZIP); +#endif + + std::vector snapshotValues; + for (bool encryptionMode : encryptionModes) { + Optional keys = encryptionMode ? cipherKeys : Optional(); + for (auto& compressionMode : compressionModes) { + Value v = + serializeChunkedSnapshot(fileNameRef, snapshotData, targetSnapshotChunkSize, compressionMode, keys); + fmt::print("snapshot({0}, {1}): {2}\n", + encryptionMode, + compressionMode.present() ? CompressionUtils::toString(compressionMode.get()) : "", + v.size()); + for (auto& v2 : snapshotValues) { + ASSERT(v != v2); + } + snapshotValues.push_back(v); + } + } + fmt::print("Validated {0} encryption/compression combos for snapshot\n", snapshotValues.size()); + + std::vector deltaValues; + for (bool encryptionMode : encryptionModes) { + Optional keys = encryptionMode ? cipherKeys : Optional(); + for (auto& compressionMode : compressionModes) { + Value v = serializeChunkedDeltaFile( + fileNameRef, deltaData, kvGen.allRange, targetDeltaChunkSize, compressionMode, keys); + fmt::print("delta({0}, {1}): {2}\n", + encryptionMode, + compressionMode.present() ? CompressionUtils::toString(compressionMode.get()) : "", + v.size()); + for (auto& v2 : deltaValues) { + ASSERT(v != v2); + } + deltaValues.push_back(v); + } + } + fmt::print("Validated {0} encryption/compression combos for delta\n", deltaValues.size()); + + return Void(); +} + TEST_CASE("/blobgranule/files/snapshotFormatUnitTest") { // snapshot files are likely to have a non-trivial shared prefix since they're for a small contiguous key range KeyValueGen kvGen; @@ -2032,6 +2360,12 @@ void checkDeltaRead(const KeyValueGen& kvGen, std::map expectedData; Version lastFileEndVersion = 0; + fmt::print("Delta Read [{0} - {1}) @ {2} - {3}\n", + range.begin.printable(), + range.end.printable(), + beginVersion, + readVersion); + applyDeltasByVersion(data, range, beginVersion, readVersion, lastFileEndVersion, expectedData); // actual answer @@ -2047,6 +2381,17 @@ void checkDeltaRead(const KeyValueGen& kvGen, RangeResult actualData = materializeBlobGranule(chunk, range, beginVersion, readVersion, {}, serialized); + if (expectedData.size() != actualData.size()) { + fmt::print("Expected Data {0}:\n", expectedData.size()); + /*for (auto& it : expectedData) { + fmt::print(" {0}=\n", it.first.printable()); + }*/ + fmt::print("Actual Data {0}:\n", actualData.size()); + /*for (auto& it : actualData) { + fmt::print(" {0}=\n", it.key.printable()); + }*/ + } + ASSERT(expectedData.size() == actualData.size()); int i = 0; for (auto& it : expectedData) { @@ -2088,28 +2433,27 @@ static std::tuple randomizeKeyAndVersions(const KeyV return { readRange, beginVersion, readVersion }; } -Standalone genDeltas(KeyValueGen& kvGen, int targetBytes) { - Standalone data; - int totalDataBytes = 0; - while (totalDataBytes < targetBytes) { - data.push_back(data.arena(), kvGen.newDelta()); - totalDataBytes += data.back().expectedSize(); - } - return data; -} - TEST_CASE("/blobgranule/files/deltaFormatUnitTest") { KeyValueGen kvGen; Standalone fileNameRef = StringRef(std::string("test")); int targetChunks = deterministicRandom()->randomExp(0, 8); int targetDataBytes = deterministicRandom()->randomExp(0, 21); - int targetChunkSize = targetDataBytes / targetChunks; Standalone data = genDeltas(kvGen, targetDataBytes); fmt::print("Deltas ({0})\n", data.size()); + /*for (auto& it : data) { + fmt::print(" {0}) ({1})\n", it.version, it.mutations.size()); + for (auto& it2 : it.mutations) { + if (it2.type == MutationRef::Type::SetValue) { + fmt::print(" {0}=\n", it2.param1.printable()); + } else { + fmt::print(" {0} - {1}\n", it2.param1.printable(), it2.param2.printable()); + } + } + }*/ Value serialized = serializeChunkedDeltaFile( fileNameRef, data, kvGen.allRange, targetChunkSize, kvGen.compressFilter, kvGen.cipherKeys); @@ -2193,9 +2537,26 @@ void checkGranuleRead(const KeyValueGen& kvGen, } RangeResult actualData = materializeBlobGranule(chunk, range, beginVersion, readVersion, snapshotPtr, deltaPtrs); + if (expectedData.size() != actualData.size()) { + fmt::print("Expected Size {0} != Actual Size {1}\n", expectedData.size(), actualData.size()); + } + if (BG_FILES_TEST_DEBUG) { + fmt::print("Expected Data {0}:\n", expectedData.size()); + for (auto& it : expectedData) { + fmt::print(" {0}=\n", it.first.printable()); + } + fmt::print("Actual Data {0}:\n", actualData.size()); + for (auto& it : actualData) { + fmt::print(" {0}=\n", it.key.printable()); + } + } + ASSERT(expectedData.size() == actualData.size()); int i = 0; for (auto& it : expectedData) { + if (it.first != actualData[i].key) { + fmt::print("expected {0} != actual {1}\n", it.first.printable(), actualData[i].key.printable()); + } ASSERT(it.first == actualData[i].key); ASSERT(it.second == actualData[i].value); i++; @@ -2212,13 +2573,41 @@ TEST_CASE("/blobgranule/files/granuleReadUnitTest") { int targetSnapshotBytes = (int)(deterministicRandom()->randomInt(0, targetDataBytes)); int targetDeltaBytes = targetDataBytes - targetSnapshotBytes; + if (BG_FILES_TEST_DEBUG) { + fmt::print("Snapshot Chunks: {0}\nDelta Chunks: {1}\nSnapshot Bytes: {2}\nDelta Bytes: {3}\n", + targetSnapshotChunks, + targetDeltaChunks, + targetSnapshotBytes, + targetDeltaBytes); + } + int targetSnapshotChunkSize = targetSnapshotBytes / targetSnapshotChunks; int targetDeltaChunkSize = targetDeltaBytes / targetDeltaChunks; Standalone snapshotData = genSnapshot(kvGen, targetSnapshotBytes); + if (BG_FILES_TEST_DEBUG) { + fmt::print("Snapshot data: {0}\n", snapshotData.size()); + for (auto& it : snapshotData) { + fmt::print(" {0}=\n", it.key.printable()); + } + } Standalone deltaData = genDeltas(kvGen, targetDeltaBytes); fmt::print("{0} snapshot rows and {1} deltas\n", snapshotData.size(), deltaData.size()); + if (BG_FILES_TEST_DEBUG) { + fmt::print("Delta data: {0}\n", deltaData.size()); + for (auto& it : deltaData) { + fmt::print(" {0}) ({1})\n", it.version, it.mutations.size()); + for (auto& it2 : it.mutations) { + if (it2.type == MutationRef::Type::SetValue) { + fmt::print(" {0}=\n", it2.param1.printable()); + } else { + fmt::print(" {0} - {1}\n", it2.param1.printable(), it2.param2.printable()); + } + } + } + } + Value serializedSnapshot = serializeChunkedSnapshot( fileNameRef, snapshotData, targetSnapshotChunkSize, kvGen.compressFilter, kvGen.cipherKeys); @@ -2237,8 +2626,10 @@ TEST_CASE("/blobgranule/files/granuleReadUnitTest") { if (!fileData.empty()) { if (j == deltaData.size() && deterministicRandom()->coinflip()) { // if it's the last set of deltas, sometimes make them the memory deltas instead + fmt::print("Memory Deltas {0} - {1}\n", fileData.front().version, fileData.back().version); inMemoryDeltas = fileData; } else { + fmt::print("Delta file {0} - {1}\n", fileData.front().version, fileData.back().version); Standalone fileNameRef = StringRef("delta" + std::to_string(i)); Value serializedDelta = serializeChunkedDeltaFile(fileNameRef, fileData, @@ -2283,3 +2674,489 @@ TEST_CASE("/blobgranule/files/granuleReadUnitTest") { return Void(); } + +// performance micro-benchmarks + +struct FileSet { + std::tuple> snapshotFile; + std::vector>> deltaFiles; + Key commonPrefix; + KeyRange range; +}; + +std::pair parseFilename(const std::string& fname) { + auto dotPos = fname.find("."); + ASSERT(dotPos > 0); + std::string type = fname.substr(dotPos + 1); + ASSERT(type == "snapshot" || type == "delta"); + auto lastUnderscorePos = fname.rfind("_"); + ASSERT('V' == fname[lastUnderscorePos + 1]); + std::string versionString = fname.substr(lastUnderscorePos + 2, dotPos); + Version version = std::stoll(versionString); + return { type, version }; +} + +Value loadFileData(std::string filename) { + std::ifstream input(filename, std::ios::binary); + ASSERT(input.good()); + + // copies all data into buffer + std::vector buffer(std::istreambuf_iterator(input), {}); + Value v(StringRef(&buffer[0], buffer.size())); + fmt::print("Loaded {0} file bytes from {1}\n", v.size(), filename); + + input.close(); + return v; +} + +struct CommonPrefixStats { + // for computing common prefix details and stats + Key key; + int len = -1; + int64_t totalKeySize = 0; + int totalKeys = 0; + int minKeySize = 1000000000; + int maxKeySize = 0; + + void addKey(const KeyRef& k) { + if (len == -1) { + key = k; + len = k.size(); + } else { + len = std::min(len, commonPrefixLength(k, key)); + } + totalKeys++; + totalKeySize += k.size(); + minKeySize = std::min(minKeySize, k.size()); + maxKeySize = std::max(maxKeySize, k.size()); + } + + Key done() { + ASSERT(len >= 0); + fmt::print("Common prefix: {0}\nCommon Prefix Length: {1}\nAverage Key Size: {2}\nMin Key Size: {3}, Max Key " + "Size: {4}\n", + key.substr(0, len).printable(), + len, + totalKeySize / totalKeys, + minKeySize, + maxKeySize); + return key.substr(0, len); + } +}; + +FileSet loadFileSet(std::string basePath, const std::vector& filenames) { + FileSet files; + CommonPrefixStats stats; + for (int i = 0; i < filenames.size(); i++) { + auto parts = parseFilename(filenames[i]); + std::string type = parts.first; + Version version = parts.second; + if (type == "snapshot") { + std::string fpath = basePath + filenames[i]; + Value data = loadFileData(fpath); + + Arena arena; + GranuleSnapshot file; + ObjectReader dataReader(data.begin(), Unversioned()); + dataReader.deserialize(FileIdentifierFor::value, file, arena); + Standalone parsed(file, arena); + + fmt::print("Loaded {0} rows from snapshot file\n", parsed.size()); + files.snapshotFile = { filenames[i], version, data, parsed }; + + for (auto& it : parsed) { + stats.addKey(it.key); + } + } else { + std::string fpath = basePath + filenames[i]; + Value data = loadFileData(fpath); + + Arena arena; + GranuleDeltas file; + ObjectReader dataReader(data.begin(), Unversioned()); + dataReader.deserialize(FileIdentifierFor::value, file, arena); + Standalone parsed(file, arena); + + fmt::print("Loaded {0} deltas from delta file\n", parsed.size()); + files.deltaFiles.push_back({ filenames[i], version, data, parsed }); + + for (auto& it : parsed) { + for (auto& it2 : it.mutations) { + stats.addKey(it2.param1); + if (it2.type == MutationRef::Type::ClearRange) { + stats.addKey(it2.param2); + } + } + } + } + } + + files.commonPrefix = stats.done(); + if (files.commonPrefix.size() == 0) { + files.range = normalKeys; + } else { + files.range = KeyRangeRef(files.commonPrefix, strinc(files.commonPrefix)); + } + fmt::print("Range: [{0} - {1})\n", files.range.begin.printable(), files.range.end.printable()); + + return files; +} + +int WRITE_RUNS = 5; + +std::pair doSnapshotWriteBench(const Standalone& data, + bool chunked, + Optional cipherKeys, + Optional compressionFilter) { + Standalone fileNameRef = StringRef(); + int64_t serializedBytes = 0; + double elapsed = -timer_monotonic(); + for (int runI = 0; runI < WRITE_RUNS; runI++) { + if (!chunked) { + serializedBytes = ObjectWriter::toValue(data, Unversioned()).size(); + } else { + serializedBytes = + serializeChunkedSnapshot(fileNameRef, data, 64 * 1024, compressionFilter, cipherKeys).size(); + } + } + elapsed += timer_monotonic(); + elapsed /= WRITE_RUNS; + return { serializedBytes, elapsed }; +} + +std::pair doDeltaWriteBench(const Standalone& data, + const KeyRangeRef& fileRange, + bool chunked, + Optional cipherKeys, + Optional compressionFilter) { + Standalone fileNameRef = StringRef(); + int64_t serializedBytes = 0; + double elapsed = -timer_monotonic(); + for (int runI = 0; runI < WRITE_RUNS; runI++) { + if (!chunked) { + serializedBytes = ObjectWriter::toValue(data, Unversioned()).size(); + } else { + serializedBytes = + serializeChunkedDeltaFile(fileNameRef, data, fileRange, 32 * 1024, compressionFilter, cipherKeys) + .size(); + } + } + elapsed += timer_monotonic(); + elapsed /= WRITE_RUNS; + return { serializedBytes, elapsed }; +} + +FileSet rewriteChunkedFileSet(const FileSet& fileSet, + Optional keys, + Optional compressionFilter) { + Standalone fileNameRef = StringRef(); + FileSet newFiles; + newFiles.snapshotFile = fileSet.snapshotFile; + newFiles.deltaFiles = fileSet.deltaFiles; + newFiles.commonPrefix = fileSet.commonPrefix; + newFiles.range = fileSet.range; + + std::get<2>(newFiles.snapshotFile) = + serializeChunkedSnapshot(fileNameRef, std::get<3>(newFiles.snapshotFile), 64 * 1024, compressionFilter, keys); + for (auto& deltaFile : newFiles.deltaFiles) { + std::get<2>(deltaFile) = serializeChunkedDeltaFile( + fileNameRef, std::get<3>(deltaFile), fileSet.range, 32 * 1024, compressionFilter, keys); + } + + return newFiles; +} + +int READ_RUNS = 20; +std::pair doReadBench(const FileSet& fileSet, + bool chunked, + KeyRange readRange, + bool clearAllAtEnd, + Optional keys, + Optional compressionFilter) { + Version readVersion = std::get<1>(fileSet.deltaFiles.back()); + + Standalone chunk; + StringRef deltaPtrs[fileSet.deltaFiles.size()]; + + MutationRef clearAllAtEndMutation; + if (clearAllAtEnd) { + clearAllAtEndMutation = MutationRef(MutationRef::Type::ClearRange, readRange.begin, readRange.end); + } + if (chunked) { + size_t snapshotSize = std::get<3>(fileSet.snapshotFile).size(); + chunk.snapshotFile = + BlobFilePointerRef(chunk.arena(), std::get<0>(fileSet.snapshotFile), 0, snapshotSize, snapshotSize, keys); + + for (int i = 0; i < fileSet.deltaFiles.size(); i++) { + size_t deltaSize = std::get<3>(fileSet.deltaFiles[i]).size(); + chunk.deltaFiles.emplace_back_deep( + chunk.arena(), std::get<0>(fileSet.deltaFiles[i]), 0, deltaSize, deltaSize, keys); + deltaPtrs[i] = std::get<2>(fileSet.deltaFiles[i]); + } + + if (clearAllAtEnd) { + readVersion++; + MutationsAndVersionRef lastDelta; + lastDelta.version = readVersion; + lastDelta.mutations.push_back(chunk.arena(), clearAllAtEndMutation); + + chunk.newDeltas.push_back_deep(chunk.arena(), lastDelta); + } + + chunk.keyRange = fileSet.range; + chunk.includedVersion = readVersion; + chunk.snapshotVersion = std::get<1>(fileSet.snapshotFile); + } + + int64_t serializedBytes = 0; + double elapsed = -timer_monotonic(); + for (int runI = 0; runI < READ_RUNS; runI++) { + if (!chunked) { + std::map data; + for (auto& it : std::get<3>(fileSet.snapshotFile)) { + data.insert({ it.key, it.value }); + } + Version lastFileEndVersion = 0; + for (auto& deltaFile : fileSet.deltaFiles) { + applyDeltasByVersion(std::get<3>(deltaFile), readRange, 0, readVersion, lastFileEndVersion, data); + } + if (clearAllAtEnd) { + applyDelta(readRange, clearAllAtEndMutation, data); + } + RangeResult actualData; + for (auto& it : data) { + actualData.push_back_deep(actualData.arena(), KeyValueRef(it.first, it.second)); + } + serializedBytes += actualData.expectedSize(); + } else { + RangeResult actualData = + materializeBlobGranule(chunk, readRange, 0, readVersion, std::get<2>(fileSet.snapshotFile), deltaPtrs); + serializedBytes += actualData.expectedSize(); + } + } + elapsed += timer_monotonic(); + elapsed /= READ_RUNS; + serializedBytes /= READ_RUNS; + return { serializedBytes, elapsed }; +} + +void printMetrics(int64_t diskBytes, double elapsed, int64_t processesBytes, int64_t logicalSize) { + double storageAmp = (1.0 * diskBytes) / logicalSize; + + double MBperCPUsec = (elapsed == 0.0) ? 0.0 : (processesBytes / 1024.0 / 1024.0) / elapsed; + fmt::print("{}", fmt::format(" {:.6} {:.6}", storageAmp, MBperCPUsec)); +} + +TEST_CASE("!/blobgranule/files/benchFromFiles") { + std::string basePath = "SET_ME"; + std::vector> fileSetNames = { { "SET_ME" } }; + Arena ar; + BlobGranuleCipherKeysCtx cipherKeys = getCipherKeysCtx(ar); + std::vector chunkModes = { false, true }; + std::vector encryptionModes = { false, true }; + std::vector> compressionModes; + compressionModes.push_back({}); +#ifdef ZLIB_LIB_SUPPORTED + compressionModes.push_back(CompressionFilter::GZIP); +#endif + + std::vector runNames = { "logical" }; + std::vector> snapshotMetrics; + std::vector> deltaMetrics; + + std::vector fileSets; + int64_t logicalSnapshotSize = 0; + int64_t logicalDeltaSize = 0; + for (auto& it : fileSetNames) { + FileSet fileSet = loadFileSet(basePath, it); + fileSets.push_back(fileSet); + logicalSnapshotSize += std::get<3>(fileSet.snapshotFile).expectedSize(); + for (auto& deltaFile : fileSet.deltaFiles) { + logicalDeltaSize += std::get<3>(deltaFile).expectedSize(); + } + } + snapshotMetrics.push_back({ logicalSnapshotSize, 0.0 }); + deltaMetrics.push_back({ logicalDeltaSize, 0.0 }); + + for (bool chunk : chunkModes) { + for (bool encrypt : encryptionModes) { + if (!chunk && encrypt) { + continue; + } + Optional keys = encrypt ? cipherKeys : Optional(); + for (auto& compressionFilter : compressionModes) { + if (!chunk && compressionFilter.present()) { + continue; + } + std::string name; + if (!chunk) { + name = "old"; + } else { + if (encrypt) { + name += "ENC"; + } + if (compressionFilter.present()) { + name += "CMP"; + } + if (name.empty()) { + name = "chunked"; + } + } + runNames.push_back(name); + int64_t snapshotTotalBytes = 0; + double snapshotTotalElapsed = 0.0; + for (auto& fileSet : fileSets) { + auto res = doSnapshotWriteBench(std::get<3>(fileSet.snapshotFile), chunk, keys, compressionFilter); + snapshotTotalBytes += res.first; + snapshotTotalElapsed += res.second; + } + snapshotMetrics.push_back({ snapshotTotalBytes, snapshotTotalElapsed }); + + int64_t deltaTotalBytes = 0; + double deltaTotalElapsed = 0.0; + for (auto& fileSet : fileSets) { + for (auto& deltaFile : fileSet.deltaFiles) { + auto res = + doDeltaWriteBench(std::get<3>(deltaFile), fileSet.range, chunk, keys, compressionFilter); + deltaTotalBytes += res.first; + deltaTotalElapsed += res.second; + } + } + deltaMetrics.push_back({ deltaTotalBytes, deltaTotalElapsed }); + } + } + } + + fmt::print("\n\n\n\nWrite Results:\n"); + + ASSERT(runNames.size() == snapshotMetrics.size()); + ASSERT(runNames.size() == deltaMetrics.size()); + for (int i = 0; i < runNames.size(); i++) { + fmt::print("{0}", runNames[i]); + + printMetrics( + snapshotMetrics[i].first, snapshotMetrics[i].second, snapshotMetrics[i].first, snapshotMetrics[0].first); + printMetrics(deltaMetrics[i].first, deltaMetrics[i].second, deltaMetrics[i].first, deltaMetrics[0].first); + + int64_t logicalTotalBytes = snapshotMetrics[0].first + deltaMetrics[0].first; + int64_t totalBytes = deltaMetrics[i].first + snapshotMetrics[i].first; + double logicalTotalElapsed = (snapshotMetrics[i].second == 0.0 || deltaMetrics[i].second == 0.0) + ? 0.0 + : snapshotMetrics[i].second + deltaMetrics[i].second; + printMetrics(totalBytes, logicalTotalElapsed, deltaMetrics[i].first, logicalTotalBytes); + + fmt::print("\n"); + } + + std::vector readRunNames = {}; + std::vector> readMetrics; + + bool doEdgeCaseReadTests = true; + std::vector clearAllReadMetrics; + std::vector readSingleKeyMetrics; + + for (bool chunk : chunkModes) { + for (bool encrypt : encryptionModes) { + if (!chunk && encrypt) { + continue; + } + Optional keys = encrypt ? cipherKeys : Optional(); + for (auto& compressionFilter : compressionModes) { + if (!chunk && compressionFilter.present()) { + continue; + } + std::string name; + if (!chunk) { + name = "old"; + } else { + if (encrypt) { + name += "ENC"; + } + if (compressionFilter.present()) { + name += "CMP"; + } + if (name.empty()) { + name = "chunked"; + } + } + readRunNames.push_back(name); + + int64_t totalBytesRead = 0; + double totalElapsed = 0.0; + double totalElapsedClearAll = 0.0; + double totalElapsedSingleKey = 0.0; + for (auto& fileSet : fileSets) { + FileSet newFileSet; + if (!chunk) { + newFileSet = fileSet; + } else { + newFileSet = rewriteChunkedFileSet(fileSet, keys, compressionFilter); + } + + auto res = doReadBench(newFileSet, chunk, fileSet.range, false, keys, compressionFilter); + totalBytesRead += res.first; + totalElapsed += res.second; + + if (doEdgeCaseReadTests) { + totalElapsedClearAll += + doReadBench(newFileSet, chunk, fileSet.range, true, keys, compressionFilter).second; + Key k = std::get<3>(fileSet.snapshotFile).front().key; + KeyRange singleKeyRange(KeyRangeRef(k, keyAfter(k))); + totalElapsedSingleKey += + doReadBench(newFileSet, chunk, singleKeyRange, false, keys, compressionFilter).second; + } + } + readMetrics.push_back({ totalBytesRead, totalElapsed }); + if (doEdgeCaseReadTests) { + clearAllReadMetrics.push_back(totalElapsedClearAll); + readSingleKeyMetrics.push_back(totalElapsedSingleKey); + } + } + } + } + + fmt::print("\n\nRead Results:\n"); + + ASSERT(readRunNames.size() == readMetrics.size()); + for (int i = 0; i < readRunNames.size(); i++) { + fmt::print("{0}", readRunNames[i]); + + double MBperCPUsec = (readMetrics[i].first / 1024.0 / 1024.0) / readMetrics[i].second; + fmt::print(" {:.6}", MBperCPUsec); + + fmt::print("\n"); + } + + if (doEdgeCaseReadTests) { + ASSERT(readRunNames.size() == clearAllReadMetrics.size()); + ASSERT(readRunNames.size() == readSingleKeyMetrics.size()); + fmt::print("\n\nEdge Case Read Results:\n"); + + for (int i = 0; i < readRunNames.size(); i++) { + fmt::print("{0}", readRunNames[i]); + + // use MB from full read test but elapsed from these tests so the numbers make sense relatively + double MBperCPUsecClearAll = (readMetrics[i].first / 1024.0 / 1024.0) / clearAllReadMetrics[i]; + double MBperCPUsecSingleKey = (readMetrics[i].first / 1024.0 / 1024.0) / readSingleKeyMetrics[i]; + fmt::print(" {:.6} {:.6}", MBperCPUsecClearAll, MBperCPUsecSingleKey); + + fmt::print("\n"); + } + } + + fmt::print("\n\nCombined Results:\n"); + ASSERT(readRunNames.size() == runNames.size() - 1); + for (int i = 0; i < readRunNames.size(); i++) { + fmt::print("{0}", readRunNames[i]); + int64_t logicalBytes = deltaMetrics[i + 1].first; + double totalElapsed = snapshotMetrics[i + 1].second + deltaMetrics[i + 1].second + readMetrics[i].second; + double MBperCPUsec = (logicalBytes / 1024.0 / 1024.0) / totalElapsed; + fmt::print(" {:.6}", MBperCPUsec); + + fmt::print("\n"); + } + + fmt::print("\n\nBenchmark Complete!\n"); + + return Void(); +} diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 58478c698e..38587014e2 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -914,7 +914,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES, 64*1024 ); if ( randomize && BUGGIFY ) BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES = BG_SNAPSHOT_FILE_TARGET_BYTES / (1 << deterministicRandom()->randomInt(0, 8)); init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 ); init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 ); - init( BG_DELTA_FILE_TARGET_CHUNK_BYTES, 64*1024 ); if ( randomize && BUGGIFY ) BG_DELTA_FILE_TARGET_CHUNK_BYTES = BG_DELTA_FILE_TARGET_BYTES / (1 << deterministicRandom()->randomInt(0, 7)); + init( BG_DELTA_FILE_TARGET_CHUNK_BYTES, 32*1024 ); if ( randomize && BUGGIFY ) BG_DELTA_FILE_TARGET_CHUNK_BYTES = BG_DELTA_FILE_TARGET_BYTES / (1 << deterministicRandom()->randomInt(0, 7)); init( BG_MAX_SPLIT_FANOUT, 10 ); if( randomize && BUGGIFY ) BG_MAX_SPLIT_FANOUT = deterministicRandom()->randomInt(5, 15); init( BG_MAX_MERGE_FANIN, 10 ); if( randomize && BUGGIFY ) BG_MAX_MERGE_FANIN = deterministicRandom()->randomInt(2, 15); init( BG_HOT_SNAPSHOT_VERSIONS, 5000000 ); diff --git a/fdbclient/include/fdbclient/BlobGranuleFiles.h b/fdbclient/include/fdbclient/BlobGranuleFiles.h index 5d895ca9bb..f6a159a7fa 100644 --- a/fdbclient/include/fdbclient/BlobGranuleFiles.h +++ b/fdbclient/include/fdbclient/BlobGranuleFiles.h @@ -27,13 +27,13 @@ #include "flow/CompressionUtils.h" Value serializeChunkedSnapshot(const Standalone& fileNameRef, - Standalone snapshot, + const Standalone& snapshot, int chunkSize, Optional compressFilter, Optional cipherKeysCtx = {}); Value serializeChunkedDeltaFile(const Standalone& fileNameRef, - Standalone deltas, + const Standalone& deltas, const KeyRangeRef& fileRange, int chunkSize, Optional compressFilter, diff --git a/fdbserver/include/fdbserver/DeltaTree.h b/fdbserver/include/fdbserver/DeltaTree.h index a7c73f0d78..6a87ee529d 100644 --- a/fdbserver/include/fdbserver/DeltaTree.h +++ b/fdbserver/include/fdbserver/DeltaTree.h @@ -34,40 +34,6 @@ #define deltatree_printf(...) #endif -typedef uint64_t Word; -// Get the number of prefix bytes that are the same between a and b, up to their common length of cl -static inline int commonPrefixLength(uint8_t const* ap, uint8_t const* bp, int cl) { - int i = 0; - const int wordEnd = cl - sizeof(Word) + 1; - - for (; i < wordEnd; i += sizeof(Word)) { - Word a = *(Word*)ap; - Word b = *(Word*)bp; - if (a != b) { - return i + ctzll(a ^ b) / 8; - } - ap += sizeof(Word); - bp += sizeof(Word); - } - - for (; i < cl; i++) { - if (*ap != *bp) { - return i; - } - ++ap; - ++bp; - } - return cl; -} - -static inline int commonPrefixLength(const StringRef& a, const StringRef& b) { - return commonPrefixLength(a.begin(), b.begin(), std::min(a.size(), b.size())); -} - -static inline int commonPrefixLength(const StringRef& a, const StringRef& b, int skipLen) { - return commonPrefixLength(a.begin() + skipLen, b.begin() + skipLen, std::min(a.size(), b.size()) - skipLen); -} - // This appears to be the fastest version static int lessOrEqualPowerOfTwo(int n) { int p; diff --git a/flow/CompressionUtils.cpp b/flow/CompressionUtils.cpp index c829cafc16..2a90f1e49a 100644 --- a/flow/CompressionUtils.cpp +++ b/flow/CompressionUtils.cpp @@ -128,4 +128,25 @@ TEST_CASE("/CompressionUtils/gzipCompression") { return Void(); } + +TEST_CASE("/CompressionUtils/gzipCompression2") { + Arena arena; + const int size = deterministicRandom()->randomInt(512, 1024); + std::string s(size, 'x'); + Standalone uncompressed = Standalone(StringRef(s)); + printf("Size before: %d\n", (int)uncompressed.size()); + + Standalone compressed = CompressionUtils::compress(CompressionFilter::GZIP, uncompressed, arena); + ASSERT_NE(compressed.compare(uncompressed), 0); + printf("Size after: %d\n", (int)compressed.size()); + // Assert compressed size is less than half. + ASSERT(compressed.size() * 2 < uncompressed.size()); + + StringRef verify = CompressionUtils::decompress(CompressionFilter::GZIP, compressed, arena); + ASSERT_EQ(verify.compare(uncompressed), 0); + + TraceEvent("GzipCompression_Done").log(); + + return Void(); +} #endif diff --git a/flow/include/flow/Arena.h b/flow/include/flow/Arena.h index 9e14990565..63c50a807b 100644 --- a/flow/include/flow/Arena.h +++ b/flow/include/flow/Arena.h @@ -853,6 +853,40 @@ inline bool operator>=(const StringRef& lhs, const StringRef& rhs) { return !(lhs < rhs); } +typedef uint64_t Word; +// Get the number of prefix bytes that are the same between a and b, up to their common length of cl +static inline int commonPrefixLength(uint8_t const* ap, uint8_t const* bp, int cl) { + int i = 0; + const int wordEnd = cl - sizeof(Word) + 1; + + for (; i < wordEnd; i += sizeof(Word)) { + Word a = *(Word*)ap; + Word b = *(Word*)bp; + if (a != b) { + return i + ctzll(a ^ b) / 8; + } + ap += sizeof(Word); + bp += sizeof(Word); + } + + for (; i < cl; i++) { + if (*ap != *bp) { + return i; + } + ++ap; + ++bp; + } + return cl; +} + +static inline int commonPrefixLength(const StringRef& a, const StringRef& b) { + return commonPrefixLength(a.begin(), b.begin(), std::min(a.size(), b.size())); +} + +static inline int commonPrefixLength(const StringRef& a, const StringRef& b, int skipLen) { + return commonPrefixLength(a.begin() + skipLen, b.begin() + skipLen, std::min(a.size(), b.size()) - skipLen); +} + // This trait is used by VectorRef to determine if deep copy constructor should recursively // call deep copies of each element. //