Removed versioned records from the mutation buffer and commit path because support for this was incomplete and has high overhead costs when only exact committed versions are being read. The BTree could still contain versioned records in the future as the read cursors still support this, but some optimizations were added for when all internal records are at version 0 which is the case and has been for quite some time. Mutation buffer now stores keys and values in one arena per buffer.
This commit is contained in:
parent
f1ec780b31
commit
648040b070
|
@ -2812,53 +2812,17 @@ public:
|
|||
// A write shall not become durable until the following call to commit() begins, and shall be durable once the following call to commit() returns
|
||||
void set(KeyValueRef keyValue) {
|
||||
++counts.sets;
|
||||
SingleKeyMutationsByVersion &changes = insertMutationBoundary(keyValue.key)->second.startKeyMutations;
|
||||
|
||||
if(singleVersion) {
|
||||
if(changes.empty()) {
|
||||
changes[0] = SingleKeyMutation(keyValue.value);
|
||||
}
|
||||
else {
|
||||
changes.begin()->second = SingleKeyMutation(keyValue.value);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// Add the set if the changes set is empty or the last entry isn't a set to exactly the same value
|
||||
if(changes.empty() || !changes.rbegin()->second.equalToSet(keyValue.value)) {
|
||||
changes[m_writeVersion] = SingleKeyMutation(keyValue.value);
|
||||
}
|
||||
}
|
||||
m_pBuffer->insertMutationBoundary(keyValue.key)->second.setBoundaryValue(ValueRef(m_pBuffer->arena, keyValue.value));
|
||||
}
|
||||
void clear(KeyRangeRef range) {
|
||||
|
||||
void clear(KeyRangeRef clearedRange) {
|
||||
++counts.clears;
|
||||
MutationBufferT::iterator iBegin = insertMutationBoundary(range.begin);
|
||||
MutationBufferT::iterator iEnd = insertMutationBoundary(range.end);
|
||||
MutationBuffer::MutationsT::iterator iBegin = m_pBuffer->insertMutationBoundary(clearedRange.begin);
|
||||
MutationBuffer::MutationsT::iterator iEnd = m_pBuffer->insertMutationBoundary(clearedRange.end);
|
||||
|
||||
// In single version mode, clear all pending updates in the affected range
|
||||
if(singleVersion) {
|
||||
RangeMutation &range = iBegin->second;
|
||||
range.startKeyMutations.clear();
|
||||
range.startKeyMutations[0] = SingleKeyMutation();
|
||||
range.rangeClearVersion = 0;
|
||||
++iBegin;
|
||||
m_pBuffer->erase(iBegin, iEnd);
|
||||
}
|
||||
else {
|
||||
// For each boundary in the cleared range
|
||||
while(iBegin != iEnd) {
|
||||
RangeMutation &range = iBegin->second;
|
||||
|
||||
// Set the rangeClearedVersion if not set
|
||||
if(!range.rangeClearVersion.present())
|
||||
range.rangeClearVersion = m_writeVersion;
|
||||
|
||||
// Add a clear to the startKeyMutations map if it's empty or the last item is not a clear
|
||||
if(range.startKeyMutations.empty() || !range.startKeyMutations.rbegin()->second.isClear())
|
||||
range.startKeyMutations[m_writeVersion] = SingleKeyMutation();
|
||||
|
||||
++iBegin;
|
||||
}
|
||||
}
|
||||
iBegin->second.clearAll();
|
||||
++iBegin;
|
||||
m_pBuffer->mutations.erase(iBegin, iEnd);
|
||||
}
|
||||
|
||||
void mutate(int op, StringRef param1, StringRef param2) NOT_IMPLEMENTED
|
||||
|
@ -2885,13 +2849,12 @@ public:
|
|||
return m_lastCommittedVersion;
|
||||
}
|
||||
|
||||
VersionedBTree(IPager2 *pager, std::string name, bool singleVersion = false)
|
||||
VersionedBTree(IPager2 *pager, std::string name)
|
||||
: m_pager(pager),
|
||||
m_writeVersion(invalidVersion),
|
||||
m_lastCommittedVersion(invalidVersion),
|
||||
m_pBuffer(nullptr),
|
||||
m_name(name),
|
||||
singleVersion(singleVersion)
|
||||
m_name(name)
|
||||
{
|
||||
m_init = init_impl(this);
|
||||
m_latestCommit = m_init;
|
||||
|
@ -3027,17 +2990,14 @@ public:
|
|||
|
||||
Reference<IStoreCursor> readAtVersion(Version v) {
|
||||
// Only committed versions can be read.
|
||||
Version recordVersion = singleVersion ? 0 : v;
|
||||
ASSERT(v <= m_lastCommittedVersion);
|
||||
if(singleVersion) {
|
||||
ASSERT(v == m_lastCommittedVersion);
|
||||
}
|
||||
Reference<IPagerSnapshot> snapshot = m_pager->getReadSnapshot(v);
|
||||
|
||||
// Snapshot will continue to hold the metakey value memory
|
||||
// This is a ref because snapshot will continue to hold the metakey value memory
|
||||
KeyRef m = snapshot->getMetaKey();
|
||||
|
||||
return Reference<IStoreCursor>(new Cursor(snapshot, ((MetaKey *)m.begin())->root.get(), recordVersion));
|
||||
// Currently all internal records generated in the write path are at version 0
|
||||
return Reference<IStoreCursor>(new Cursor(snapshot, ((MetaKey *)m.begin())->root.get(), (Version)0));
|
||||
}
|
||||
|
||||
// Must be nondecreasing
|
||||
|
@ -3048,13 +3008,6 @@ public:
|
|||
// When starting a new mutation buffer its start version must be greater than the last write version
|
||||
ASSERT(v > m_writeVersion);
|
||||
m_pBuffer = &m_mutationBuffers[v];
|
||||
|
||||
// Create range representing the entire keyspace. This reduces edge cases to applying mutations
|
||||
// because now all existing keys are within some range in the mutation map.
|
||||
(*m_pBuffer)[dbBegin.key] = RangeMutation();
|
||||
// Setting the dbEnd key to be cleared prevents having to treat a range clear to dbEnd as a special
|
||||
// case in order to avoid traversing down the rightmost edge of the tree.
|
||||
(*m_pBuffer)[dbEnd.key].startKeyMutations[0] = SingleKeyMutation();
|
||||
}
|
||||
else {
|
||||
// It's OK to set the write version to the same version repeatedly so long as m_pBuffer is not null
|
||||
|
@ -3114,10 +3067,6 @@ public:
|
|||
return destroyAndCheckSanity_impl(this);
|
||||
}
|
||||
|
||||
bool isSingleVersion() const {
|
||||
return singleVersion;
|
||||
}
|
||||
|
||||
private:
|
||||
struct VersionAndChildrenRef {
|
||||
VersionAndChildrenRef(Version v, VectorRef<RedwoodRecordRef> children, RedwoodRecordRef upperBound)
|
||||
|
@ -3281,47 +3230,102 @@ private:
|
|||
}
|
||||
};
|
||||
|
||||
// Represents mutations on a single key and a possible clear to a range that begins
|
||||
// immediately after that key
|
||||
typedef std::map<Version, SingleKeyMutation> SingleKeyMutationsByVersion;
|
||||
struct RangeMutation {
|
||||
// Mutations for exactly the start key
|
||||
SingleKeyMutationsByVersion startKeyMutations;
|
||||
// A clear range version, if cleared, for the range starting immediately AFTER the start key
|
||||
Optional<Version> rangeClearVersion;
|
||||
|
||||
bool keyCleared() const {
|
||||
return startKeyMutations.size() == 1 && startKeyMutations.begin()->second.isClear();
|
||||
RangeMutation() : boundaryChanged(false), clearAfterBoundary(false) {
|
||||
}
|
||||
|
||||
bool keyChanged() const {
|
||||
return !startKeyMutations.empty();
|
||||
}
|
||||
bool boundaryChanged;
|
||||
Optional<ValueRef> boundaryValue; // Not present means cleared
|
||||
bool clearAfterBoundary;
|
||||
|
||||
bool rangeCleared() const {
|
||||
return rangeClearVersion.present();
|
||||
bool boundaryCleared() const {
|
||||
return boundaryChanged && !boundaryValue.present();
|
||||
}
|
||||
|
||||
// Returns true if this RangeMutation doesn't actually mutate anything
|
||||
bool noChanges() const {
|
||||
return !rangeClearVersion.present() && startKeyMutations.empty();
|
||||
return !boundaryChanged && !clearAfterBoundary;
|
||||
}
|
||||
|
||||
void clearBoundary() {
|
||||
boundaryChanged = true;
|
||||
boundaryValue.reset();
|
||||
}
|
||||
|
||||
void clearAll() {
|
||||
clearBoundary();
|
||||
clearAfterBoundary = true;
|
||||
}
|
||||
|
||||
void setBoundaryValue(ValueRef v) {
|
||||
boundaryChanged = true;
|
||||
boundaryValue = v;
|
||||
}
|
||||
|
||||
bool boundarySet() const {
|
||||
return boundaryChanged && boundaryValue.present();
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
std::string result;
|
||||
result.append("rangeClearVersion: ");
|
||||
if(rangeClearVersion.present())
|
||||
result.append(format("%" PRId64 "", rangeClearVersion.get()));
|
||||
else
|
||||
result.append("<not present>");
|
||||
result.append(" startKeyMutations: ");
|
||||
for(SingleKeyMutationsByVersion::value_type const &m : startKeyMutations)
|
||||
result.append(format("[%" PRId64 " => %s] ", m.first, m.second.toString().c_str()));
|
||||
return result;
|
||||
return format("boundaryChanged=%d clearAfterBoundary=%d boundaryValue=%s", boundaryChanged, clearAfterBoundary, ::toString(boundaryValue).c_str());
|
||||
}
|
||||
};
|
||||
|
||||
typedef std::map<Key, RangeMutation> MutationBufferT;
|
||||
struct MutationBuffer {
|
||||
MutationBuffer() {
|
||||
// Create range representing the entire keyspace. This reduces edge cases to applying mutations
|
||||
// because now all existing keys are within some range in the mutation map.
|
||||
mutations[dbBegin.key];
|
||||
// Setting the dbEnd key to be cleared prevents having to treat a range clear to dbEnd as a special
|
||||
// case in order to avoid traversing down the rightmost edge of the tree.
|
||||
mutations[dbEnd.key].clearBoundary();
|
||||
}
|
||||
|
||||
Arena arena;
|
||||
typedef std::map<KeyRef, RangeMutation> MutationsT;
|
||||
typedef MutationsT::iterator iterator;
|
||||
typedef MutationsT::const_iterator const_iterator;
|
||||
MutationsT mutations;
|
||||
|
||||
const_iterator upper_bound(KeyRef k) const {
|
||||
return mutations.upper_bound(k);
|
||||
}
|
||||
|
||||
const_iterator lower_bound(KeyRef k) const {
|
||||
return mutations.lower_bound(k);
|
||||
}
|
||||
|
||||
// Find or create a mutation buffer boundary for bound and return an iterator to it
|
||||
iterator insertMutationBoundary(KeyRef boundary) {
|
||||
// Find the first split point in buffer that is >= key
|
||||
// Since the initial state of the mutation buffer contains the range '' through
|
||||
// the maximum possible key, our search had to have found something so we
|
||||
// can assume the iterator is valid.
|
||||
iterator ib = mutations.lower_bound(boundary);
|
||||
|
||||
// If we found the boundary we are looking for, return its iterator
|
||||
if(ib->first == boundary) {
|
||||
return ib;
|
||||
}
|
||||
|
||||
// ib is our insert hint. Copy boundary into arena and insert boundary into buffer
|
||||
boundary = KeyRef(arena, boundary);
|
||||
ib = mutations.insert(ib, {boundary, RangeMutation()});
|
||||
|
||||
// ib is certainly > begin() because it is guaranteed that the empty string
|
||||
// boundary exists and the only way to have found that is to look explicitly
|
||||
// for it in which case we would have returned above.
|
||||
iterator iPrevious = ib;
|
||||
--iPrevious;
|
||||
// If the range we just divided was being cleared, then the dividing boundary key and range after it must also be cleared
|
||||
if(iPrevious->second.clearAfterBoundary) {
|
||||
ib->second.clearAll();
|
||||
}
|
||||
|
||||
return ib;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
/* Mutation Buffer Overview
|
||||
*
|
||||
|
@ -3373,8 +3377,8 @@ private:
|
|||
*/
|
||||
|
||||
IPager2 *m_pager;
|
||||
MutationBufferT *m_pBuffer;
|
||||
std::map<Version, MutationBufferT> m_mutationBuffers;
|
||||
MutationBuffer *m_pBuffer;
|
||||
std::map<Version, MutationBuffer> m_mutationBuffers;
|
||||
|
||||
Version m_writeVersion;
|
||||
Version m_lastCommittedVersion;
|
||||
|
@ -3382,7 +3386,6 @@ private:
|
|||
Future<Void> m_latestCommit;
|
||||
Future<Void> m_init;
|
||||
std::string m_name;
|
||||
bool singleVersion;
|
||||
|
||||
// MetaKey changes size so allocate space for it to expand into
|
||||
union {
|
||||
|
@ -3393,38 +3396,6 @@ private:
|
|||
LazyDeleteQueueT m_lazyDeleteQueue;
|
||||
int m_maxPartSize;
|
||||
|
||||
// Find or create a mutation buffer boundary for bound and return an iterator to it
|
||||
MutationBufferT::iterator insertMutationBoundary(Key boundary) {
|
||||
ASSERT(m_pBuffer != nullptr);
|
||||
|
||||
// Find the first split point in buffer that is >= key
|
||||
MutationBufferT::iterator ib = m_pBuffer->lower_bound(boundary);
|
||||
|
||||
// Since the initial state of the mutation buffer contains the range '' through
|
||||
// the maximum possible key, our search had to have found something.
|
||||
ASSERT(ib != m_pBuffer->end());
|
||||
|
||||
// If we found the boundary we are looking for, return its iterator
|
||||
if(ib->first == boundary) {
|
||||
return ib;
|
||||
}
|
||||
|
||||
// ib is our insert hint. Insert the new boundary and set ib to its entry
|
||||
ib = m_pBuffer->insert(ib, {boundary, RangeMutation()});
|
||||
|
||||
// ib is certainly > begin() because it is guaranteed that the empty string
|
||||
// boundary exists and the only way to have found that is to look explicitly
|
||||
// for it in which case we would have returned above.
|
||||
MutationBufferT::iterator iPrevious = ib;
|
||||
--iPrevious;
|
||||
if(iPrevious->second.rangeClearVersion.present()) {
|
||||
ib->second.rangeClearVersion = iPrevious->second.rangeClearVersion;
|
||||
ib->second.startKeyMutations[iPrevious->second.rangeClearVersion.get()] = SingleKeyMutation();
|
||||
}
|
||||
|
||||
return ib;
|
||||
}
|
||||
|
||||
// Writes entries to 1 or more pages and return a vector of boundary keys with their IPage(s)
|
||||
ACTOR static Future<Standalone<VectorRef<RedwoodRecordRef>>> writePages(VersionedBTree *self, bool minimalBoundaries, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound, VectorRef<RedwoodRecordRef> entries, int height, Version v, BTreePageID previousID) {
|
||||
ASSERT(entries.size() > 0);
|
||||
|
@ -3807,9 +3778,9 @@ private:
|
|||
// iMutationBoundaryEnd is least boundary >= upperBound->key
|
||||
ACTOR static Future<Standalone<VersionedChildrenT>> commitSubtree(
|
||||
VersionedBTree *self,
|
||||
MutationBufferT *mutationBuffer,
|
||||
//MutationBufferT::const_iterator iMutationBoundary, // = mutationBuffer->upper_bound(lowerBound->key); --iMutationBoundary;
|
||||
//MutationBufferT::const_iterator iMutationBoundaryEnd, // = mutationBuffer->lower_bound(upperBound->key);
|
||||
MutationBuffer *mutationBuffer,
|
||||
//MutationBuffer::const_iterator iMutationBoundary, // = mutationBuffer->upper_bound(lowerBound->key); --iMutationBoundary;
|
||||
//MutationBuffer::const_iterator iMutationBoundaryEnd, // = mutationBuffer->lower_bound(upperBound->key);
|
||||
Reference<IPagerSnapshot> snapshot,
|
||||
BTreePageID rootID,
|
||||
bool isLeaf,
|
||||
|
@ -3825,6 +3796,7 @@ private:
|
|||
context = format("CommitSubtree(root=%s): ", toString(rootID).c_str());
|
||||
}
|
||||
|
||||
state Version writeVersion = self->getLastCommittedVersion() + 1;
|
||||
state Standalone<VersionedChildrenT> results;
|
||||
|
||||
debug_printf("%s lower=%s upper=%s\n", context.c_str(), lowerBound->toString().c_str(), upperBound->toString().c_str());
|
||||
|
@ -3832,9 +3804,9 @@ private:
|
|||
self->counts.commitToPageStart++;
|
||||
|
||||
// Find the slice of the mutation buffer that is relevant to this subtree
|
||||
state MutationBufferT::const_iterator iMutationBoundary = mutationBuffer->upper_bound(lowerBound->key);
|
||||
state MutationBuffer::const_iterator iMutationBoundary = mutationBuffer->upper_bound(lowerBound->key);
|
||||
--iMutationBoundary;
|
||||
state MutationBufferT::const_iterator iMutationBoundaryEnd = mutationBuffer->lower_bound(upperBound->key);
|
||||
state MutationBuffer::const_iterator iMutationBoundaryEnd = mutationBuffer->lower_bound(upperBound->key);
|
||||
|
||||
if(REDWOOD_DEBUG) {
|
||||
debug_printf("%s ---------MUTATION BUFFER SLICE ---------------------\n", context.c_str());
|
||||
|
@ -3857,14 +3829,13 @@ private:
|
|||
// If there are any changes to the one key then the entire subtree should be deleted as the changes for the key
|
||||
// do not go into this subtree.
|
||||
if(iMutationBoundary == iMutationBoundaryEnd) {
|
||||
if(iMutationBoundary->second.keyChanged()) {
|
||||
if(iMutationBoundary->second.boundaryChanged) {
|
||||
debug_printf("%s lower and upper bound key/version match and key is modified so deleting page, returning %s\n", context.c_str(), toString(results).c_str());
|
||||
Version firstKeyChangeVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : iMutationBoundary->second.startKeyMutations.begin()->first;
|
||||
if(isLeaf) {
|
||||
self->freeBtreePage(rootID, firstKeyChangeVersion);
|
||||
self->freeBtreePage(rootID, writeVersion);
|
||||
}
|
||||
else {
|
||||
self->m_lazyDeleteQueue.pushBack(LazyDeleteQueueEntry{firstKeyChangeVersion, rootID});
|
||||
self->m_lazyDeleteQueue.pushBack(LazyDeleteQueueEntry{writeVersion, rootID});
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
@ -3877,13 +3848,13 @@ private:
|
|||
|
||||
// If one mutation range covers the entire subtree, then check if the entire subtree is modified,
|
||||
// unmodified, or possibly/partially modified.
|
||||
MutationBufferT::const_iterator iMutationBoundaryNext = iMutationBoundary;
|
||||
MutationBuffer::const_iterator iMutationBoundaryNext = iMutationBoundary;
|
||||
++iMutationBoundaryNext;
|
||||
if(iMutationBoundaryNext == iMutationBoundaryEnd) {
|
||||
// Cleared means the entire range covering the subtree was cleared. It is assumed true
|
||||
// if the range starting after the lower mutation boundary was cleared, and then proven false
|
||||
// below if possible.
|
||||
bool cleared = iMutationBoundary->second.rangeCleared();
|
||||
bool cleared = iMutationBoundary->second.clearAfterBoundary;
|
||||
// Unchanged means the entire range covering the subtree was unchanged, it is assumed to be the
|
||||
// opposite of cleared() and then proven false below if possible.
|
||||
bool unchanged = !cleared;
|
||||
|
@ -3893,12 +3864,12 @@ private:
|
|||
// that key is being changed or cleared affects this subtree.
|
||||
if(iMutationBoundary->first == lowerBound->key) {
|
||||
// If subtree will be cleared (so far) but the lower boundary key is not cleared then the subtree is not cleared
|
||||
if(cleared && !iMutationBoundary->second.keyCleared()) {
|
||||
if(cleared && !iMutationBoundary->second.boundaryCleared()) {
|
||||
cleared = false;
|
||||
debug_printf("%s cleared=%d unchanged=%d\n", context.c_str(), cleared, unchanged);
|
||||
}
|
||||
// If the subtree looked unchanged (so far) but the lower boundary is is changed then the subtree is changed
|
||||
if(unchanged && iMutationBoundary->second.keyChanged()) {
|
||||
if(unchanged && iMutationBoundary->second.boundaryChanged) {
|
||||
unchanged = false;
|
||||
debug_printf("%s cleared=%d unchanged=%d\n", context.c_str(), cleared, unchanged);
|
||||
}
|
||||
|
@ -3909,7 +3880,7 @@ private:
|
|||
if((cleared || unchanged) && iMutationBoundaryEnd->first == upperBound->key) {
|
||||
// If the key is being changed then the records in this subtree with the same key must be removed
|
||||
// so the subtree is definitely not unchanged, though it may be cleared to achieve the same effect.
|
||||
if(iMutationBoundaryEnd->second.keyChanged()) {
|
||||
if(iMutationBoundaryEnd->second.boundaryChanged) {
|
||||
unchanged = false;
|
||||
debug_printf("%s cleared=%d unchanged=%d\n", context.c_str(), cleared, unchanged);
|
||||
}
|
||||
|
@ -3934,12 +3905,11 @@ private:
|
|||
// If subtree is cleared
|
||||
if(cleared) {
|
||||
debug_printf("%s %s cleared, deleting it, returning %s\n", context.c_str(), isLeaf ? "Page" : "Subtree", toString(results).c_str());
|
||||
Version clearVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : iMutationBoundary->second.rangeClearVersion.get();
|
||||
if(isLeaf) {
|
||||
self->freeBtreePage(rootID, clearVersion);
|
||||
self->freeBtreePage(rootID, writeVersion);
|
||||
}
|
||||
else {
|
||||
self->m_lazyDeleteQueue.pushBack(LazyDeleteQueueEntry{clearVersion, rootID});
|
||||
self->m_lazyDeleteQueue.pushBack(LazyDeleteQueueEntry{writeVersion, rootID});
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
@ -3952,7 +3922,6 @@ private:
|
|||
debug_printf("%s commitSubtree(): %s\n", context.c_str(), btPage->toString(false, rootID, snapshot->getVersion(), decodeLowerBound, decodeUpperBound).c_str());
|
||||
|
||||
state BTreePage::BinaryTree::Cursor cursor;
|
||||
state Version writeVersion;
|
||||
|
||||
if(REDWOOD_DEBUG) {
|
||||
debug_printf("%s ---------MUTATION BUFFER SLICE ---------------------\n", context.c_str());
|
||||
|
@ -4011,36 +3980,24 @@ private:
|
|||
while(iMutationBoundary != iMutationBoundaryEnd) {
|
||||
debug_printf("%s New mutation boundary: '%s': %s\n", context.c_str(), printable(iMutationBoundary->first).c_str(), iMutationBoundary->second.toString().c_str());
|
||||
|
||||
SingleKeyMutationsByVersion::const_iterator iMutations;
|
||||
|
||||
// For the first mutation boundary only, if the boundary key is less than the lower bound for the page
|
||||
// then skip startKeyMutations for this boundary, we're only processing this mutation range here to apply
|
||||
// a possible clear to existing data.
|
||||
if(firstMutationBoundary && iMutationBoundary->first < lowerBound->key) {
|
||||
iMutations = iMutationBoundary->second.startKeyMutations.end();
|
||||
}
|
||||
else {
|
||||
iMutations = iMutationBoundary->second.startKeyMutations.begin();
|
||||
}
|
||||
// Apply the change to the mutation buffer start boundary key only if
|
||||
// - there actually is a change (whether a set or a clear, old records are to be removed)
|
||||
// - either this is not the first boundary or it is but its key matches our lower bound key
|
||||
bool applyBoundaryChange = iMutationBoundary->second.boundaryChanged && (!firstMutationBoundary || iMutationBoundary->first >= lowerBound->key);
|
||||
firstMutationBoundary = false;
|
||||
|
||||
SingleKeyMutationsByVersion::const_iterator iMutationsEnd = iMutationBoundary->second.startKeyMutations.end();
|
||||
|
||||
// Iterate over old versions of the mutation boundary key, outputting if necessary
|
||||
bool boundaryKeyWritten = false;
|
||||
|
||||
// Iterate over records for the mutation boundary key, keep them unless the boundary key was changed or we are not applying it
|
||||
while(cursor.valid() && cursor.get().key == iMutationBoundary->first) {
|
||||
// If not in single version mode or there were no changes to the key
|
||||
if(!self->singleVersion || iMutationBoundary->second.noChanges()) {
|
||||
// If not updating, add to the output set, otherwise do nothing
|
||||
// If there were no changes to the key or we're not applying it
|
||||
if(!applyBoundaryChange) {
|
||||
// If not updating, add to the output set, otherwise skip ahead past the records for the mutation boundary
|
||||
if(!updating) {
|
||||
merged.push_back(merged.arena(), cursor.get());
|
||||
debug_printf("%s Added %s [existing, boundary start]\n", context.c_str(), cursor.get().toString().c_str());
|
||||
}
|
||||
boundaryKeyWritten = true;
|
||||
cursor.moveNext();
|
||||
}
|
||||
else {
|
||||
ASSERT(self->singleVersion);
|
||||
changesMade = true;
|
||||
// If updating, erase from the page, otherwise do not add to the output set
|
||||
if(updating) {
|
||||
|
@ -4048,7 +4005,7 @@ private:
|
|||
cursor.erase();
|
||||
}
|
||||
else {
|
||||
debug_printf("%s Skipped %s [existing, boundary start, singleVersion mode]\n", context.c_str(), cursor.get().toString().c_str());
|
||||
debug_printf("%s Skipped %s [existing, boundary start]\n", context.c_str(), cursor.get().toString().c_str());
|
||||
cursor.moveNext();
|
||||
}
|
||||
}
|
||||
|
@ -4056,54 +4013,40 @@ private:
|
|||
|
||||
constexpr int maxHeightAllowed = 8;
|
||||
|
||||
// TODO: If a mutation set is equal to the previous existing value of the key, maybe don't write it.
|
||||
// Output mutations for the mutation boundary start key
|
||||
while(iMutations != iMutationsEnd) {
|
||||
const SingleKeyMutation &m = iMutations->second;
|
||||
if(m.isClear() || m.value.size() <= self->m_maxPartSize) {
|
||||
// If the boundary key was not yet written to the merged list then clears can be skipped.
|
||||
// Note that in a more complex scenario where there are multiple sibling pages for the same key, with different
|
||||
// versions and/or part numbers, this is still a valid thing to do. This is because a changing boundary
|
||||
// key (set or clear) will result in any instances (different versions, split parts) of this key
|
||||
// on sibling pages to the left of this page to be removed, so an explicit clear need only be stored
|
||||
// if a record with the mutation boundary key was already written to this page.
|
||||
if(!boundaryKeyWritten && iMutations->second.isClear()) {
|
||||
debug_printf("%s Skipped %s [mutation, unnecessary boundary key clear]\n", context.c_str(), m.toRecord(iMutationBoundary->first, iMutations->first).toString().c_str());
|
||||
}
|
||||
else {
|
||||
RedwoodRecordRef rec = m.toRecord(iMutationBoundary->first, iMutations->first);
|
||||
// If updating, add to the page, else add to the output set
|
||||
if(updating) {
|
||||
if(cursor.mirror->insert(rec, skipLen, maxHeightAllowed)) {
|
||||
debug_printf("%s Inserted non-split %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str());
|
||||
}
|
||||
else {
|
||||
debug_printf("%s Inserted failed for non-split %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str());
|
||||
switchToLinearMerge();
|
||||
}
|
||||
}
|
||||
if(!updating) {
|
||||
merged.push_back(merged.arena(), rec);
|
||||
debug_printf("%s Added non-split %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str());
|
||||
}
|
||||
// Write the new record(s) for the mutation boundary start key if its value has been set
|
||||
// Clears of this key will have been processed above by not being erased from the updated page or excluded from the merge output
|
||||
if(applyBoundaryChange && iMutationBoundary->second.boundarySet()) {
|
||||
RedwoodRecordRef rec(iMutationBoundary->first, 0, iMutationBoundary->second.boundaryValue.get());
|
||||
changesMade = true;
|
||||
|
||||
changesMade = true;
|
||||
boundaryKeyWritten = true;
|
||||
if(rec.value.get().size() <= self->m_maxPartSize) {
|
||||
// If updating, add to the page, else add to the output set
|
||||
if(updating) {
|
||||
if(cursor.mirror->insert(rec, skipLen, maxHeightAllowed)) {
|
||||
debug_printf("%s Inserted non-split %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str());
|
||||
}
|
||||
else {
|
||||
debug_printf("%s Inserted failed for non-split %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str());
|
||||
switchToLinearMerge();
|
||||
}
|
||||
}
|
||||
if(!updating) {
|
||||
merged.push_back(merged.arena(), rec);
|
||||
debug_printf("%s Added non-split %s [mutation, boundary start]\n", context.c_str(), rec.toString().c_str());
|
||||
}
|
||||
|
||||
}
|
||||
else {
|
||||
changesMade = true;
|
||||
int bytesLeft = m.value.size();
|
||||
int bytesLeft = rec.value.get().size();
|
||||
int start = 0;
|
||||
RedwoodRecordRef whole(iMutationBoundary->first, iMutations->first, m.value);
|
||||
while(bytesLeft > 0) {
|
||||
int partSize = std::min(bytesLeft, self->m_maxPartSize);
|
||||
// Don't copy the value chunk because this page will stay in memory until after we've built new version(s) of it
|
||||
RedwoodRecordRef rec = whole.split(start, partSize);
|
||||
// Don't copy the value chunk because mutation buffer will stay in memory until after the new page is written
|
||||
RedwoodRecordRef part = rec.split(start, partSize);
|
||||
bytesLeft -= partSize;
|
||||
|
||||
if(updating) {
|
||||
if(cursor.mirror->insert(rec, skipLen, maxHeightAllowed)) {
|
||||
if(cursor.mirror->insert(part, skipLen, maxHeightAllowed)) {
|
||||
debug_printf("%s Inserted split %s [mutation, boundary start] bytesLeft %d\n", context.c_str(), rec.toString().c_str(), bytesLeft);
|
||||
}
|
||||
else {
|
||||
|
@ -4113,19 +4056,17 @@ private:
|
|||
}
|
||||
|
||||
if(!updating) {
|
||||
merged.push_back(merged.arena(), rec);
|
||||
merged.push_back(merged.arena(), part);
|
||||
debug_printf("%s Added split %s [mutation, boundary start] bytesLeft %d\n", context.c_str(), rec.toString().c_str(), bytesLeft);
|
||||
}
|
||||
|
||||
start += partSize;
|
||||
}
|
||||
boundaryKeyWritten = true;
|
||||
}
|
||||
++iMutations;
|
||||
}
|
||||
|
||||
// Get the clear version for this range, which is the last thing that we need from it,
|
||||
Optional<Version> clearRangeVersion = iMutationBoundary->second.rangeClearVersion;
|
||||
// Before advancing the iterator, get whether or not the records in the following range must be removed
|
||||
bool remove = iMutationBoundary->second.clearAfterBoundary;
|
||||
// Advance to the next boundary because we need to know the end key for the current range.
|
||||
++iMutationBoundary;
|
||||
if(iMutationBoundary == iMutationBoundaryEnd) {
|
||||
|
@ -4135,7 +4076,6 @@ private:
|
|||
debug_printf("%s Mutation range end: '%s'\n", context.c_str(), printable(iMutationBoundary->first).c_str());
|
||||
|
||||
// Now handle the records up through but not including the next mutation boundary key
|
||||
bool remove = self->singleVersion && clearRangeVersion.present();
|
||||
RedwoodRecordRef end(iMutationBoundary->first);
|
||||
|
||||
// If the records are being removed and we're not doing an in-place update
|
||||
|
@ -4155,7 +4095,7 @@ private:
|
|||
// linear merge than the visit is to add them to the output set.
|
||||
while(cursor.valid() && cursor.get().compare(end, skipLen) < 0) {
|
||||
if(updating) {
|
||||
debug_printf("%s Erasing %s [existing, boundary start, singleVersion mode]\n", context.c_str(), cursor.get().toString().c_str());
|
||||
debug_printf("%s Erasing %s [existing, boundary start]\n", context.c_str(), cursor.get().toString().c_str());
|
||||
cursor.erase();
|
||||
changesMade = true;
|
||||
}
|
||||
|
@ -4171,7 +4111,7 @@ private:
|
|||
// If there are still more records, they have the same key as the end boundary
|
||||
if(cursor.valid()) {
|
||||
// If the end boundary is changing, we must remove the remaining records in this page
|
||||
bool remove = iMutationBoundaryEnd->second.keyChanged();
|
||||
bool remove = iMutationBoundaryEnd->second.boundaryChanged;
|
||||
if(remove) {
|
||||
changesMade = true;
|
||||
}
|
||||
|
@ -4310,9 +4250,6 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Either handle multi-versioned results or change commitSubtree interface to return a single child set.
|
||||
ASSERT(self->singleVersion);
|
||||
writeVersion = self->getLastCommittedVersion() + 1;
|
||||
// All of the things added to pageBuilder will exist in the arenas inside futureChildren or will be upperBound
|
||||
BTreePage::BinaryTree::Cursor c = getCursor(page);
|
||||
c.moveFirst();
|
||||
|
@ -4360,7 +4297,7 @@ private:
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> commit_impl(VersionedBTree *self) {
|
||||
state MutationBufferT *mutations = self->m_pBuffer;
|
||||
state MutationBuffer *mutations = self->m_pBuffer;
|
||||
|
||||
// No more mutations are allowed to be written to this mutation buffer we will commit
|
||||
// at m_writeVersion, which we must save locally because it could change during commit.
|
||||
|
@ -4561,6 +4498,14 @@ private:
|
|||
return present() && pageCursor->cursor.get().version <= v;
|
||||
}
|
||||
|
||||
// This is to enable an optimization for the case where all internal records are at the
|
||||
// same version and there are no implicit clears
|
||||
// *this MUST be valid()
|
||||
bool presentAtExactVersionUnsharded(Version v) const {
|
||||
auto const &rec = pageCursor->cursor.get();
|
||||
return rec.value.present() && rec.version == v && rec.chunk.total == 0;
|
||||
}
|
||||
|
||||
// Returns true if cursor position is present() and has an effective version <= v
|
||||
bool validAtVersion(Version v) {
|
||||
return valid() && pageCursor->cursor.get().version <= v;
|
||||
|
@ -4737,8 +4682,8 @@ private:
|
|||
// KeyValueRefs returned become invalid once the cursor is moved
|
||||
class Cursor : public IStoreCursor, public ReferenceCounted<Cursor>, public FastAllocated<Cursor>, NonCopyable {
|
||||
public:
|
||||
Cursor(Reference<IPagerSnapshot> pageSource, BTreePageID root, Version recordVersion)
|
||||
: m_version(recordVersion),
|
||||
Cursor(Reference<IPagerSnapshot> pageSource, BTreePageID root, Version internalRecordVersion)
|
||||
: m_version(internalRecordVersion),
|
||||
m_cur1(pageSource, root),
|
||||
m_cur2(m_cur1)
|
||||
{
|
||||
|
@ -4922,9 +4867,10 @@ private:
|
|||
self->m_kv.reset();
|
||||
while(self->m_cur1.valid()) {
|
||||
|
||||
if(self->m_cur1.presentAtVersion(self->m_version) &&
|
||||
if(self->m_cur1.presentAtExactVersionUnsharded(self->m_version) ||
|
||||
(self->m_cur1.presentAtVersion(self->m_version) &&
|
||||
(!self->m_cur2.validAtVersion(self->m_version) ||
|
||||
self->m_cur2.get().key != self->m_cur1.get().key)
|
||||
self->m_cur2.get().key != self->m_cur1.get().key))
|
||||
) {
|
||||
wait(readFullKVPair(self));
|
||||
return Void();
|
||||
|
@ -5004,7 +4950,7 @@ public:
|
|||
KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID) : m_filePrefix(filePrefix) {
|
||||
// TODO: This constructor should really just take an IVersionedStore
|
||||
IPager2 *pager = new DWALPager(4096, filePrefix, 0);
|
||||
m_tree = new VersionedBTree(pager, filePrefix, true);
|
||||
m_tree = new VersionedBTree(pager, filePrefix);
|
||||
m_init = catchError(init_impl(this));
|
||||
}
|
||||
|
||||
|
@ -5319,8 +5265,8 @@ ACTOR Future<int> verifyRange(VersionedBTree *btree, Key start, Key end, Version
|
|||
|
||||
debug_printf("VerifyRangeReverse(@%" PRId64 ", %s, %s): start\n", v, start.toString().c_str(), end.toString().c_str());
|
||||
|
||||
// Randomly use a new cursor for the reverse range read but only if version history is available
|
||||
if(!btree->isSingleVersion() && deterministicRandom()->coinflip()) {
|
||||
// Randomly use a new cursor at the same version for the reverse range read, if the version is still available for opening new cursors
|
||||
if(v >= btree->getOldestVersion() && deterministicRandom()->coinflip()) {
|
||||
cur = btree->readAtVersion(v);
|
||||
}
|
||||
|
||||
|
@ -5365,20 +5311,18 @@ ACTOR Future<int> verifyRange(VersionedBTree *btree, Key start, Key end, Version
|
|||
return errors;
|
||||
}
|
||||
|
||||
ACTOR Future<int> verifyAll(VersionedBTree *btree, Version maxCommittedVersion, std::map<std::pair<std::string, Version>, Optional<std::string>> *written, int *pErrorCount) {
|
||||
// Read back every key at every version set or cleared and verify the result.
|
||||
// Verify the result of point reads for every set or cleared key at the given version
|
||||
ACTOR Future<int> seekAll(VersionedBTree *btree, Version v, std::map<std::pair<std::string, Version>, Optional<std::string>> *written, int *pErrorCount) {
|
||||
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator i = written->cbegin();
|
||||
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator iEnd = written->cend();
|
||||
state int errors = 0;
|
||||
state Reference<IStoreCursor> cur = btree->readAtVersion(v);
|
||||
|
||||
while(i != iEnd) {
|
||||
state std::string key = i->first.first;
|
||||
state Version ver = i->first.second;
|
||||
if(ver <= maxCommittedVersion) {
|
||||
if(ver == v) {
|
||||
state Optional<std::string> val = i->second;
|
||||
|
||||
state Reference<IStoreCursor> cur = btree->readAtVersion(ver);
|
||||
|
||||
debug_printf("Verifying @%" PRId64 " '%s'\n", ver, key.c_str());
|
||||
state Arena arena;
|
||||
wait(cur->findEqual(KeyRef(arena, key)));
|
||||
|
@ -5408,39 +5352,52 @@ ACTOR Future<int> verifyAll(VersionedBTree *btree, Version maxCommittedVersion,
|
|||
}
|
||||
|
||||
ACTOR Future<Void> verify(VersionedBTree *btree, FutureStream<Version> vStream, std::map<std::pair<std::string, Version>, Optional<std::string>> *written, int *pErrorCount, bool serial) {
|
||||
state Future<int> vall;
|
||||
state Future<int> vrange;
|
||||
state Future<int> fRangeAll;
|
||||
state Future<int> fRangeRandom;
|
||||
state Future<int> fSeekAll;
|
||||
|
||||
// Queue of committed versions still readable from btree
|
||||
state std::deque<Version> committedVersions;
|
||||
|
||||
try {
|
||||
loop {
|
||||
state Version v = waitNext(vStream);
|
||||
committedVersions.push_back(v);
|
||||
|
||||
if(btree->isSingleVersion()) {
|
||||
v = btree->getLastCommittedVersion();
|
||||
debug_printf("Verifying at latest committed version %" PRId64 "\n", v);
|
||||
vall = verifyRange(btree, LiteralStringRef(""), LiteralStringRef("\xff\xff"), v, written, pErrorCount);
|
||||
if(serial) {
|
||||
wait(success(vall));
|
||||
}
|
||||
vrange = verifyRange(btree, randomKV().key, randomKV().key, v, written, pErrorCount);
|
||||
if(serial) {
|
||||
wait(success(vrange));
|
||||
}
|
||||
// Remove expired versions
|
||||
while(!committedVersions.empty() && committedVersions.front() < btree->getOldestVersion()) {
|
||||
committedVersions.pop_front();
|
||||
}
|
||||
else {
|
||||
debug_printf("Verifying through version %" PRId64 "\n", v);
|
||||
vall = verifyAll(btree, v, written, pErrorCount);
|
||||
if(serial) {
|
||||
wait(success(vall));
|
||||
}
|
||||
vrange = verifyRange(btree, randomKV().key, randomKV().key, deterministicRandom()->randomInt(1, v + 1), written, pErrorCount);
|
||||
if(serial) {
|
||||
wait(success(vrange));
|
||||
}
|
||||
}
|
||||
wait(success(vall) && success(vrange));
|
||||
|
||||
debug_printf("Verified through version %" PRId64 ", %d errors\n", v, *pErrorCount);
|
||||
// Choose a random committed version, or sometimes the latest (which could be ahead of the latest version from vStream)
|
||||
v = (committedVersions.empty() || deterministicRandom()->coinflip()) ? btree->getLastCommittedVersion() : committedVersions[deterministicRandom()->randomInt(0, committedVersions.size())];
|
||||
debug_printf("Using committed version %" PRId64 "\n", v);
|
||||
// Get a cursor at v so that v doesn't get expired between the possibly serial steps below.
|
||||
state Reference<IStoreCursor> cur = btree->readAtVersion(v);
|
||||
|
||||
debug_printf("Verifying entire key range at version %" PRId64 "\n", v);
|
||||
fRangeAll = verifyRange(btree, LiteralStringRef(""), LiteralStringRef("\xff\xff"), v, written, pErrorCount);
|
||||
if(serial) {
|
||||
wait(success(fRangeAll));
|
||||
}
|
||||
|
||||
Key begin = randomKV().key;
|
||||
Key end = randomKV().key;
|
||||
debug_printf("Verifying range (%s, %s) at version %" PRId64 "\n", toString(begin).c_str(), toString(end).c_str(), v);
|
||||
fRangeRandom = verifyRange(btree, begin, end, v, written, pErrorCount);
|
||||
if(serial) {
|
||||
wait(success(fRangeRandom));
|
||||
}
|
||||
|
||||
debug_printf("Verifying seeks to each changed key at version %" PRId64 "\n", v);
|
||||
fSeekAll = seekAll(btree, v, written, pErrorCount);
|
||||
if(serial) {
|
||||
wait(success(fSeekAll));
|
||||
}
|
||||
|
||||
wait(success(fRangeAll) && success(fRangeRandom) && success(fSeekAll));
|
||||
|
||||
printf("Verified through version %" PRId64 ", %d errors\n", v, *pErrorCount);
|
||||
|
||||
if(*pErrorCount != 0)
|
||||
break;
|
||||
|
@ -5459,11 +5416,8 @@ ACTOR Future<Void> randomReader(VersionedBTree *btree) {
|
|||
state Reference<IStoreCursor> cur;
|
||||
loop {
|
||||
wait(yield());
|
||||
if(!cur || deterministicRandom()->random01() > .1) {
|
||||
if(!cur || deterministicRandom()->random01() > .01) {
|
||||
Version v = btree->getLastCommittedVersion();
|
||||
if(!btree->isSingleVersion()) {
|
||||
v = deterministicRandom()->randomInt(1, v + 1);
|
||||
}
|
||||
cur = btree->readAtVersion(v);
|
||||
}
|
||||
|
||||
|
@ -6262,7 +6216,6 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
|
||||
state bool serialTest = deterministicRandom()->coinflip();
|
||||
state bool shortTest = deterministicRandom()->coinflip();
|
||||
state bool singleVersion = true; // Multi-version mode is broken / not finished
|
||||
|
||||
state int pageSize = shortTest ? 200 : (deterministicRandom()->coinflip() ? 4096 : deterministicRandom()->randomInt(200, 400));
|
||||
|
||||
|
@ -6281,7 +6234,6 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
printf("\n");
|
||||
printf("serialTest: %d\n", serialTest);
|
||||
printf("shortTest: %d\n", shortTest);
|
||||
printf("singleVersion: %d\n", serialTest);
|
||||
printf("pageSize: %d\n", pageSize);
|
||||
printf("maxKeySize: %d\n", maxKeySize);
|
||||
printf("maxValueSize: %d\n", maxValueSize);
|
||||
|
@ -6299,7 +6251,7 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
printf("Initializing...\n");
|
||||
state double startTime = now();
|
||||
pager = new DWALPager(pageSize, pagerFile, 0);
|
||||
state VersionedBTree *btree = new VersionedBTree(pager, pagerFile, singleVersion);
|
||||
state VersionedBTree *btree = new VersionedBTree(pager, pagerFile);
|
||||
wait(btree->init());
|
||||
|
||||
state std::map<std::pair<std::string, Version>, Optional<std::string>> written;
|
||||
|
@ -6493,7 +6445,7 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
|
||||
printf("Reopening btree from disk.\n");
|
||||
IPager2 *pager = new DWALPager(pageSize, pagerFile, 0);
|
||||
btree = new VersionedBTree(pager, pagerFile, singleVersion);
|
||||
btree = new VersionedBTree(pager, pagerFile);
|
||||
wait(btree->init());
|
||||
|
||||
Version v = btree->getLatestVersion();
|
||||
|
@ -6622,22 +6574,21 @@ TEST_CASE("!/redwood/performance/set") {
|
|||
state int pageSize = 4096;
|
||||
state int64_t pageCacheBytes = FLOW_KNOBS->PAGE_CACHE_4K;
|
||||
DWALPager *pager = new DWALPager(pageSize, pagerFile, pageCacheBytes);
|
||||
state bool singleVersion = true;
|
||||
state VersionedBTree *btree = new VersionedBTree(pager, pagerFile, singleVersion);
|
||||
state VersionedBTree *btree = new VersionedBTree(pager, pagerFile);
|
||||
wait(btree->init());
|
||||
|
||||
state int nodeCount = 1e9;
|
||||
state int maxChangesPerVersion = 5000;
|
||||
state int64_t kvBytesTarget = 4e9;
|
||||
state int commitTarget = 20e6;
|
||||
state int minKeyPrefixBytes = 0;
|
||||
state int minKeyPrefixBytes = 25;
|
||||
state int maxKeyPrefixBytes = 25;
|
||||
state int minValueSize = 0;
|
||||
state int maxValueSize = 500;
|
||||
state int maxConsecutiveRun = 10;
|
||||
state int minValueSize = 1000;
|
||||
state int maxValueSize = 2000;
|
||||
state int minConsecutiveRun = 1000;
|
||||
state int maxConsecutiveRun = 2000;
|
||||
state char firstKeyChar = 'a';
|
||||
state char lastKeyChar = 'b';
|
||||
state char lastKeyChar = 'm';
|
||||
|
||||
printf("pageSize: %d\n", pageSize);
|
||||
printf("pageCacheBytes: %" PRId64 "\n", pageCacheBytes);
|
||||
|
|
Loading…
Reference in New Issue