Bug fixes. Memory corruption was happening due to memory ownership problems with using PrefixTree cursors. Now that internal page boundaries are truncated to the minimal necessary length they can be incomplete tuples of (key, version, offset) which was not being handled correctly (causing incorrect mutation buffer to page mappings during commit) for some partial tuple values. Tuple was modified to allow optional safe decoding of partial integer values and to assert if partial integer values are found but not allowed. Lots of debug output changes.

This commit is contained in:
Stephen Atherton 2018-07-18 03:19:35 -07:00
parent 540d88e85f
commit 7397ea4a79
4 changed files with 61 additions and 35 deletions

View File

@ -29,7 +29,7 @@ static size_t find_string_terminator(const StringRef data, size_t offset) {
return i;
}
Tuple::Tuple(StringRef const& str, bool allow_incomplete) {
Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
data.append(data.arena(), str.begin(), str.size());
size_t i = 0;
@ -51,12 +51,12 @@ Tuple::Tuple(StringRef const& str, bool allow_incomplete) {
}
// If incomplete tuples are allowed, remove the last offset if i is now beyond size()
// Strings will never be considered incomplete due to the way the string end is found.
if(allow_incomplete && i > data.size())
if(exclude_incomplete && i > data.size())
offsets.pop_back();
}
Tuple Tuple::unpack(StringRef const& str, bool allow_incomplete) {
return Tuple(str, allow_incomplete);
Tuple Tuple::unpack(StringRef const& str, bool exclude_incomplete) {
return Tuple(str, exclude_incomplete);
}
Tuple& Tuple::append(Tuple const& tuple) {
@ -187,7 +187,7 @@ Standalone<StringRef> Tuple::getString(size_t index) const {
return result;
}
int64_t Tuple::getInt(size_t index) const {
int64_t Tuple::getInt(size_t index, bool allow_incomplete) const {
if(index >= offsets.size()) {
throw invalid_tuple_index();
}
@ -209,7 +209,21 @@ int64_t Tuple::getInt(size_t index) const {
}
memset( &swap, neg ? '\xff' : 0, 8 - len );
memcpy( ((uint8_t*)&swap) + 8 - len, data.begin() + offsets[index] + 1, len );
// presentLen is how many of len bytes are actually present, it will be < len if the encoded tuple was truncated
int presentLen = std::min<int8_t>(len, data.size() - offsets[index] - 1);
ASSERT(len == presentLen || allow_incomplete);
memcpy( ((uint8_t*)&swap) + 8 - len, data.begin() + offsets[index] + 1, presentLen );
if(presentLen < len) {
int suffix = len - presentLen;
if(presentLen == 0) {
// The first byte in an int would always be at least 1, because if was 0 then a shorter int type would have been used.
// So if we don't have the first (most significant) byte in the encoded string, use 1 so that the decoded result
// maintains the encoded form's sort order with an encoded value of a shorter and same-signed type.
*( ((uint8_t*)&swap) + 8 - len) = 1;
--suffix; // The suffix to clear below is now 1 byte shorter.
}
memset( ((uint8_t*)&swap) + 8 - suffix, 0, suffix );
}
swap = bigEndian64( swap );

View File

@ -29,10 +29,11 @@
struct Tuple {
Tuple() {}
// allow_incomplete will enable parsing of a Tuple which ends with an incomplete numeric field, which will be excluded from the result.
// Strings can't be incomplete because they are parsed such that the end of the packed bytes is the end of the string in lieu of a
// specific end.
static Tuple unpack(StringRef const& str, bool allow_incomplete = false);
// Tuple parsing normally does not care of the final value is a numeric type and is incomplete.
// The exclude_incomplete will exclude such incomplete final numeric tuples from the result.
// Note that strings can't be incomplete because they are parsed such that the end of the packed
// byte string is considered the end of the string in lieu of a specific end.
static Tuple unpack(StringRef const& str, bool exclude_incomplete = false);
Tuple& append(Tuple const& tuple);
Tuple& append(StringRef const& str, bool utf8=false);
@ -53,14 +54,14 @@ struct Tuple {
ElementType getType(size_t index) const;
Standalone<StringRef> getString(size_t index) const;
int64_t getInt(size_t index) const;
int64_t getInt(size_t index, bool allow_incomplete = false) const;
KeyRange range(Tuple const& tuple = Tuple()) const;
Tuple subTuple(size_t beginIndex, size_t endIndex = std::numeric_limits<size_t>::max()) const;
private:
Tuple(const StringRef& data, bool allow_incomplete = false);
Tuple(const StringRef& data, bool exclude_incomplete = false);
Standalone<VectorRef<uint8_t>> data;
std::vector<size_t> offsets;
};

View File

@ -341,7 +341,7 @@ private:
int n = _node->getPrefixLen();
prefixBuffer.reserve(prefixBuffer.arena(), n);
prefix = prefixSource->key(n, prefixBuffer.begin());
prefix = prefixSource->key(n, prefixBuffer.begin(), false);
}
inline bool valid() const {
@ -378,10 +378,15 @@ private:
}
// Extract size bytes from the reconstituted key
StringRef key(int size, uint8_t *dst) const {
StringRef key(int size, uint8_t *dst, bool alwaysCopy) const {
// If size is less than prefix length then return a substring of it without copying anything
if(size <= prefix.size()) {
return prefix.substr(0, size);
StringRef s = prefix.substr(0, size);
if(alwaysCopy) {
memcpy(dst, s.begin(), s.size());
s = StringRef(dst, size);
}
return s;
}
ASSERT(node != nullptr);
@ -455,7 +460,7 @@ public:
const PathEntry &p = pathBack();
int n = p.node->getKeySize();
uint8_t *wptr = new (arena) uint8_t[n];
return pathBack().key(n, wptr);
return pathBack().key(n, wptr, true);
}
Standalone<StringRef> getKey() const {
@ -474,7 +479,7 @@ public:
const PathEntry &p = pathBack();
int n = p.node->getKeySize();
uint8_t *wptr = new (kv.arena()) uint8_t[n];
kv.key = pathBack().key(n, wptr);
kv.key = pathBack().key(n, wptr, true);
kv.value = getValue();
return kv;
}

View File

@ -55,18 +55,24 @@ struct BTreePage {
do {
r += "\n ";
Tuple t = Tuple::unpack(c.getKey(), true);
for(int i = 0; i < t.size(); ++i) {
if(i != 0)
r += ",";
if(t.getType(i) == Tuple::ElementType::BYTES)
r += format("'%s'", t.getString(i).printable().c_str());
if(t.getType(i) == Tuple::ElementType::INT)
r += format("%lld", t.getInt(i));
Tuple t;
try {
t = Tuple::unpack(c.getKey());
for(int i = 0; i < t.size(); ++i) {
if(i != 0)
r += ",";
if(t.getType(i) == Tuple::ElementType::BYTES)
r += format("'%s'", t.getString(i).printable().c_str());
if(t.getType(i) == Tuple::ElementType::INT)
r += format("%lld", t.getInt(i, true));
}
} catch(Error &e) {
}
r += format("['%s']", c.getKey().toHexString().c_str());
r += " -> ";
if(flags && IS_LEAF)
r += format("'%s'", c.getValue().printable().c_str());
r += format("'%s'", c.getValue().toHexString().c_str());
else
r += format("Page %u", *(const uint32_t *)c.getValue().begin());
@ -205,17 +211,17 @@ struct KeyVersionValue {
// Supports partial/incomplete encoded sequences.
static inline KeyVersionValue unpack(KeyValueRef kv) {
//debug_printf("Unpacking: '%s' -> '%s' \n", kv.key.toHexString().c_str(), kv.value.toHexString().c_str());
debug_printf("Unpacking: '%s' -> '%s' \n", kv.key.toHexString().c_str(), kv.value.toHexString().c_str());
KeyVersionValue result;
if(kv.key.size() != 0) {
//debug_printf("KeyVersionValue::unpack: %s\n", kv.key.toHexString().c_str());
Tuple k = Tuple::unpack(kv.key, true);
Tuple k = Tuple::unpack(kv.key);
if(k.size() >= 1) {
result.key = k.getString(0);
if(k.size() >= 2) {
result.version = k.getInt(1);
result.version = k.getInt(1, true);
if(k.size() >= 3) {
result.valueIndex = k.getInt(2);
result.valueIndex = k.getInt(2, true);
}
}
}
@ -632,8 +638,8 @@ private:
Reference<const IPage> rawPage = wait(snapshot->getPhysicalPage(root));
state BTreePage *page = (BTreePage *) rawPage->begin();
debug_printf("commitSubtree: page read: id=%d ver=%lld lower='%s' upper='%s'\n", root, snapshot->getVersion(), lowerBoundKey.toHexString().c_str(), upperBoundKey.toHexString().c_str());
debug_printf("commitSubtree: page read: id=%d %s\n", root, page->toString(lowerBoundKey, upperBoundKey).c_str());
debug_printf("%p commitSubtree: page read: id=%d ver=%lld lower='%s' upper='%s'\n", this, root, snapshot->getVersion(), lowerBoundKey.toHexString().c_str(), upperBoundKey.toHexString().c_str());
debug_printf("%p commitSubtree: page read: id=%d %s\n", this, root, page->toString(lowerBoundKey, upperBoundKey).c_str());
PrefixTree::Cursor existingCursor = page->tree.getCursor(lowerBoundKey, upperBoundKey);
bool existingCursorValid = existingCursor.moveFirst();
@ -642,7 +648,7 @@ private:
VersionedChildrenT results;
PrefixTree::EntriesT merged;
debug_printf("MERGING EXISTING DATA WITH MUTATIONS:\n");
debug_printf("%p MERGING EXISTING DATA WITH MUTATIONS:\n", this);
self->printMutationBuffer(iMutationBoundary, iMutationBoundaryEnd);
// It's a given that the mutation map is not empty so it's safe to do this
@ -685,7 +691,7 @@ private:
existing = KeyVersionValue::unpack(existingCursor.getKV());
}
// TODO: If a mutation set is equal to the previous existing value of the key, don't write it.
// TODO: If a mutation set is equal to the previous existing value of the key, maybe don't write it.
// Output mutations for the mutation boundary start key
while(iMutations != iMutationsEnd) {
const SingleKeyMutation &m = iMutations->second;
@ -829,7 +835,7 @@ private:
existingCursorValid = existingCursor.moveNext();
// The upper bound for the last child is upperBoundKey, and entries[i+1] for the others.
// The upper bound for the last child is upperBoundKey, and the cursor's next key for the others.
Key childUpperBound = existingCursorValid ? existingCursor.getKey() : upperBoundKey;
ASSERT(childLowerBound <= childUpperBound);