Several bug fixes.

This commit is contained in:
Stephen Atherton 2017-08-25 15:48:32 -07:00
parent 888093463b
commit 1070a5ed66
1 changed files with 20 additions and 18 deletions

View File

@ -159,7 +159,6 @@ struct KeyVersionValue {
}
static inline KeyVersionValue unpack(KeyRef key) {
debug_printf("Unpacking '%s'\n", printable(key).c_str());
if(key.size() == 0)
return KeyVersionValue(KeyRef(), 0);
return unpack(KeyValueRef(key, ValueRef()));
@ -199,8 +198,9 @@ public:
SingleKeyMutationsByVersion &changes = insertMutationBoundary(keyValue.key)->second.startKeyMutations;
// Add the set if the changes set is empty or the last entry isn't a set to exactly the same value
if(changes.empty() || !changes.rbegin()->second.equalToSet(keyValue.value))
if(changes.empty() || !changes.rbegin()->second.equalToSet(keyValue.value)) {
changes[m_writeVersion] = SingleKeyMutation(keyValue.value);
}
}
virtual void clear(KeyRangeRef range) {
ASSERT(m_writeVersion != invalidVersion);
@ -327,7 +327,7 @@ private:
}
std::string toString() const {
return format("op=%d val=%s", op, printable(value).c_str());
return format("op=%d val='%s'", op, printable(value).c_str());
}
};
@ -446,7 +446,7 @@ private:
// boundary exists and the only way to have found that is to look explicitly
// for it in which case we would have returned above.
MutationBufferT::iterator iPrevious = ib;
--ib;
--iPrevious;
ib->second.rangeClearVersion = iPrevious->second.rangeClearVersion;
return ib;
@ -523,14 +523,11 @@ private:
// subtree but have no changes in it
MutationBufferT::const_iterator iMutationBoundaryNext = iMutationBoundary;
++iMutationBoundaryNext;
if(iMutationBoundaryNext->second.noChanges()) {
if(iMutationBoundaryNext == iMutationBoundaryEnd && iMutationBoundaryNext->second.noChanges()) {
debug_printf("%s no changes because sole mutation range was empty\n", printPrefix.c_str());
return VersionedChildrenT({ {0,{{lowerBoundKey,root}}} });
}
debug_printf("%s MUTATION BUFFER:\n", printPrefix.c_str());
self->printMutationBuffer(iMutationBoundary, iMutationBoundaryEnd);
state FixedSizeMap map;
Reference<const IPage> rawPage = wait(snapshot->getPhysicalPage(root));
map = FixedSizeMap::decode(StringRef(rawPage->begin(), rawPage->size()));
@ -540,6 +537,9 @@ private:
VersionedChildrenT results;
FixedSizeMap::KVPairsT merged;
debug_printf("MERGING EXISTING DATA WITH MUTATIONS:\n");
self->printMutationBuffer(iMutationBoundary, iMutationBoundaryEnd);
SimpleFixedSizeMapRef::KVPairsT::const_iterator iExisting = map.entries.begin();
SimpleFixedSizeMapRef::KVPairsT::const_iterator iExistingEnd = map.entries.end();
@ -582,6 +582,8 @@ private:
// Output mutations for the mutation boundary start key
while(iMutations != iMutationsEnd) {
if(iMutations->first < minVersion)
minVersion = iMutations->first;
merged.push_back(iMutations->second.toKVV(iMutationBoundary->first, iMutations->first).pack());
debug_printf("Added mutation of boundary start key: %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
++iMutations;
@ -614,16 +616,16 @@ private:
if(iExisting != iExistingEnd)
existing = nextEntry;
}
}
// Write any remaining existing keys, which are not subject to clears as they are beyond the cleared range.
while(iExisting != iExistingEnd) {
merged.push_back(existing.pack());
debug_printf("Added existing tail key: %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
// Write any remaining existing keys, which are not subject to clears as they are beyond the cleared range.
while(iExisting != iExistingEnd) {
merged.push_back(existing.pack());
debug_printf("Added existing tail key: %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
++iExisting;
if(iExisting != iExistingEnd)
existing = KeyVersionValue::unpack(*iExisting);
}
++iExisting;
if(iExisting != iExistingEnd)
existing = KeyVersionValue::unpack(*iExisting);
}
debug_printf("%s DONE MERGING MUTATIONS WITH EXISTING LEAF CONTENTS\n", printPrefix.c_str());
@ -880,7 +882,7 @@ private:
debug_printf("findEqual: Reading page %d\n", pageNumber);
Reference<const IPage> rawPage = wait(self->m_pager->getPhysicalPage(pageNumber));
FixedSizeMap map = FixedSizeMap::decode(StringRef(rawPage->begin(), rawPage->size()));
debug_printf("Read page %d @%lld: %s\n", pageNumber, self->m_version, map.toString().c_str());
//debug_printf("Read page %d @%lld: %s\n", pageNumber, self->m_version, map.toString().c_str());
// Special case of empty page (which should only happen for root)
if(map.entries.empty()) {
@ -1120,7 +1122,7 @@ std::string SimpleFixedSizeMapRef::toString() const {
if(i != 0)
result.append(",");
if(t.getType(i) == Tuple::ElementType::BYTES)
result.append(format("%s", t.getString(i).toString().c_str()));
result.append(format("'%s'", t.getString(i).toString().c_str()));
if(t.getType(i) == Tuple::ElementType::INT)
result.append(format("%lld", t.getInt(i)));
}