commit
a2afda9002
|
@ -2204,6 +2204,7 @@ struct SplitStringRef {
|
|||
// A BTree "page id" is actually a list of LogicalPageID's whose contents should be concatenated together.
|
||||
// NOTE: Uses host byte order
|
||||
typedef VectorRef<LogicalPageID> BTreePageIDRef;
|
||||
constexpr LogicalPageID maxPageID = (LogicalPageID)-1;
|
||||
|
||||
std::string toString(BTreePageIDRef id) {
|
||||
return std::string("BTreePageID") + toString(id.begin(), id.end());
|
||||
|
@ -2246,6 +2247,10 @@ struct RedwoodRecordRef {
|
|||
|
||||
inline RedwoodRecordRef withoutValue() const { return RedwoodRecordRef(key, version); }
|
||||
|
||||
inline RedwoodRecordRef withMaxPageID() const {
|
||||
return RedwoodRecordRef(key, version, StringRef((uint8_t *)&maxPageID, sizeof(maxPageID)));
|
||||
}
|
||||
|
||||
// Truncate (key, version, part) tuple to len bytes.
|
||||
void truncate(int len) {
|
||||
ASSERT(len <= key.size());
|
||||
|
@ -3872,7 +3877,7 @@ private:
|
|||
// If the decode upper boundary is the subtree upper boundary the pointers will be the same
|
||||
// For the lower boundary, if the pointers are not the same there is still a possibility
|
||||
// that the keys are the same. This happens for the first remaining subtree of an internal page
|
||||
// after the previous first subtree was cleared.
|
||||
// after the prior subtree(s) were cleared.
|
||||
return (decodeUpperBound == subtreeUpperBound) &&
|
||||
(decodeLowerBound == subtreeLowerBound || decodeLowerBound->sameExceptValue(*subtreeLowerBound));
|
||||
}
|
||||
|
@ -4984,6 +4989,246 @@ public:
|
|||
Future<bool> moveLast() { return move_end(this, false); }
|
||||
};
|
||||
|
||||
// Cursor designed for short lifespans.
|
||||
// Holds references to all pages touched.
|
||||
// All record references returned from it are valid until the cursor is destroyed.
|
||||
class BTreeCursor {
|
||||
Arena arena;
|
||||
Reference<IPagerSnapshot> pager;
|
||||
std::unordered_map<LogicalPageID, Reference<const IPage>> pages;
|
||||
VersionedBTree* btree;
|
||||
bool valid;
|
||||
|
||||
struct PathEntry {
|
||||
BTreePage* btPage;
|
||||
BTreePage::BinaryTree::Cursor cursor;
|
||||
};
|
||||
VectorRef<PathEntry> path;
|
||||
|
||||
public:
|
||||
BTreeCursor() {}
|
||||
|
||||
bool isValid() const { return valid; }
|
||||
|
||||
std::string toString() const {
|
||||
std::string r;
|
||||
for (int i = 0; i < path.size(); ++i) {
|
||||
r += format("[%d/%d: %s] ", i + 1, path.size(),
|
||||
path[i].cursor.valid() ? path[i].cursor.get().toString(path[i].btPage->isLeaf()).c_str()
|
||||
: "<invalid>");
|
||||
}
|
||||
if (!valid) {
|
||||
r += " (invalid) ";
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
const RedwoodRecordRef& get() { return path.back().cursor.get(); }
|
||||
|
||||
bool inRoot() const { return path.size() == 1; }
|
||||
|
||||
// Pop and return the page cursor at the end of the path.
|
||||
// This is meant to enable range scans to consume the contents of a leaf page more efficiently.
|
||||
// Can only be used when inRoot() is true.
|
||||
BTreePage::BinaryTree::Cursor popPath() {
|
||||
BTreePage::BinaryTree::Cursor c = path.back().cursor;
|
||||
path.pop_back();
|
||||
return c;
|
||||
}
|
||||
|
||||
Future<Void> pushPage(BTreePageIDRef id, const RedwoodRecordRef& lowerBound,
|
||||
const RedwoodRecordRef& upperBound) {
|
||||
Reference<const IPage>& page = pages[id.front()];
|
||||
if (page.isValid()) {
|
||||
path.push_back(arena, { (BTreePage*)page->begin(), getCursor(page) });
|
||||
return Void();
|
||||
}
|
||||
|
||||
return map(readPage(pager, id, &lowerBound, &upperBound), [this, &page, id](Reference<const IPage> p) {
|
||||
page = p;
|
||||
path.push_back(arena, { (BTreePage*)p->begin(), getCursor(p) });
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
|
||||
Future<Void> pushPage(BTreePage::BinaryTree::Cursor c) {
|
||||
const RedwoodRecordRef& rec = c.get();
|
||||
auto next = c;
|
||||
next.moveNext();
|
||||
BTreePageIDRef id = rec.getChildPage();
|
||||
return pushPage(id, rec, next.getOrUpperBound());
|
||||
}
|
||||
|
||||
Future<Void> init(VersionedBTree* btree_in, Reference<IPagerSnapshot> pager_in, BTreePageIDRef root) {
|
||||
btree = btree_in;
|
||||
pager = pager_in;
|
||||
path.reserve(arena, 6);
|
||||
valid = false;
|
||||
return pushPage(root, dbBegin, dbEnd);
|
||||
}
|
||||
|
||||
// Seeks cursor to query if it exists, the record before or after it, or an undefined and invalid
|
||||
// position between those records
|
||||
// If 0 is returned, then
|
||||
// If the cursor is valid then it points to query
|
||||
// If the cursor is not valid then the cursor points to some place in the btree such that
|
||||
// If there is a record in the tree < query then movePrev() will move to it, and
|
||||
// If there is a record in the tree > query then moveNext() will move to it.
|
||||
// If non-zero is returned then the cursor is valid and the return value is logically equivalent
|
||||
// to query.compare(cursor.get())
|
||||
ACTOR Future<int> seek_impl(BTreeCursor* self, RedwoodRecordRef query, int prefetchBytes) {
|
||||
state RedwoodRecordRef internalPageQuery = query.withMaxPageID();
|
||||
self->path = self->path.slice(0, 1);
|
||||
debug_printf("seek(%s, %d) start cursor = %s\n", query.toString().c_str(), prefetchBytes,
|
||||
self->toString().c_str());
|
||||
|
||||
loop {
|
||||
auto& entry = self->path.back();
|
||||
if (entry.btPage->isLeaf()) {
|
||||
int cmp = entry.cursor.seek(query);
|
||||
self->valid = entry.cursor.valid() && !entry.cursor.node->isDeleted();
|
||||
debug_printf("seek(%s, %d) loop exit cmp=%d cursor=%s\n", query.toString().c_str(), prefetchBytes,
|
||||
cmp, self->toString().c_str());
|
||||
return self->valid ? cmp : 0;
|
||||
}
|
||||
|
||||
// Internal page, so seek to the branch where query must be
|
||||
// Currently, after a subtree deletion internal page boundaries are still strictly adhered
|
||||
// to and will be updated if anything is inserted into the cleared range, so if the seek fails
|
||||
// or it finds an entry with a null child page then query does not exist in the BTree.
|
||||
if (entry.cursor.seekLessThan(internalPageQuery) && entry.cursor.get().value.present()) {
|
||||
debug_printf("seek(%s, %d) loop seek success cursor=%s\n", query.toString().c_str(), prefetchBytes,
|
||||
self->toString().c_str());
|
||||
Future<Void> f = self->pushPage(entry.cursor);
|
||||
|
||||
// Prefetch siblings, at least prefetchBytes, at level 2 but without jumping to another level 2
|
||||
// sibling
|
||||
if (prefetchBytes != 0 && entry.btPage->height == 2) {
|
||||
auto c = entry.cursor;
|
||||
bool fwd = prefetchBytes > 0;
|
||||
prefetchBytes = abs(prefetchBytes);
|
||||
// While we should still preload more bytes and a move in the target direction is successful
|
||||
while (prefetchBytes > 0 && (fwd ? c.moveNext() : c.movePrev())) {
|
||||
// If there is a page link, preload it.
|
||||
if (c.get().value.present()) {
|
||||
BTreePageIDRef childPage = c.get().getChildPage();
|
||||
preLoadPage(self->pager.getPtr(), childPage);
|
||||
prefetchBytes -= self->btree->m_blockSize * childPage.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
wait(f);
|
||||
} else {
|
||||
self->valid = false;
|
||||
debug_printf("seek(%s, %d) loop exit cmp=0 cursor=%s\n", query.toString().c_str(), prefetchBytes,
|
||||
self->toString().c_str());
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<int> seek(RedwoodRecordRef query, int prefetchBytes) { return seek_impl(this, query, prefetchBytes); }
|
||||
|
||||
ACTOR Future<Void> seekGTE_impl(BTreeCursor* self, RedwoodRecordRef query, int prefetchBytes) {
|
||||
debug_printf("seekGTE(%s, %d) start\n", query.toString().c_str(), prefetchBytes);
|
||||
int cmp = wait(self->seek(query, prefetchBytes));
|
||||
if (cmp > 0 || (cmp == 0 && !self->isValid())) {
|
||||
wait(self->moveNext());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> seekGTE(RedwoodRecordRef query, int prefetchBytes) {
|
||||
return seekGTE_impl(this, query, prefetchBytes);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> seekLT_impl(BTreeCursor* self, RedwoodRecordRef query, int prefetchBytes) {
|
||||
debug_printf("seekLT(%s, %d) start\n", query.toString().c_str(), prefetchBytes);
|
||||
int cmp = wait(self->seek(query, prefetchBytes));
|
||||
if (cmp <= 0) {
|
||||
wait(self->movePrev());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> seekLT(RedwoodRecordRef query, int prefetchBytes) {
|
||||
return seekLT_impl(this, query, -prefetchBytes);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> move_impl(BTreeCursor* self, bool forward) {
|
||||
// Try to the move cursor at the end of the path in the correct direction
|
||||
debug_printf("move%s() start cursor=%s\n", forward ? "Next" : "Prev", self->toString().c_str());
|
||||
while (1) {
|
||||
debug_printf("move%s() first loop cursor=%s\n", forward ? "Next" : "Prev", self->toString().c_str());
|
||||
auto& entry = self->path.back();
|
||||
bool success;
|
||||
if(entry.cursor.valid()) {
|
||||
success = forward ? entry.cursor.moveNext() : entry.cursor.movePrev();
|
||||
} else {
|
||||
success = forward ? entry.cursor.moveFirst() : false;
|
||||
}
|
||||
|
||||
// Skip over internal page entries that do not link to child pages. There should never be two in a row.
|
||||
if (success && !entry.btPage->isLeaf() && !entry.cursor.get().value.present()) {
|
||||
success = forward ? entry.cursor.moveNext() : entry.cursor.movePrev();
|
||||
ASSERT(!success || entry.cursor.get().value.present());
|
||||
}
|
||||
|
||||
// Stop if successful
|
||||
if (success) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (self->path.size() == 1) {
|
||||
self->valid = false;
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Move to parent
|
||||
self->path = self->path.slice(0, self->path.size() - 1);
|
||||
}
|
||||
|
||||
// While not on a leaf page, move down to get to one.
|
||||
while (1) {
|
||||
debug_printf("move%s() second loop cursor=%s\n", forward ? "Next" : "Prev", self->toString().c_str());
|
||||
auto& entry = self->path.back();
|
||||
if (entry.btPage->isLeaf()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// The last entry in an internal page could be a null link, if so move back
|
||||
if (!forward && !entry.cursor.get().value.present()) {
|
||||
ASSERT(entry.cursor.movePrev());
|
||||
ASSERT(entry.cursor.get().value.present());
|
||||
}
|
||||
|
||||
wait(self->pushPage(entry.cursor));
|
||||
auto& newEntry = self->path.back();
|
||||
ASSERT(forward ? newEntry.cursor.moveFirst() : newEntry.cursor.moveLast());
|
||||
}
|
||||
|
||||
self->valid = true;
|
||||
|
||||
debug_printf("move%s() exit cursor=%s\n", forward ? "Next" : "Prev", self->toString().c_str());
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> moveNext() { return move_impl(this, true); }
|
||||
Future<Void> movePrev() { return move_impl(this, false); }
|
||||
};
|
||||
|
||||
Future<Void> initBTreeCursor(BTreeCursor* cursor, Version snapshotVersion) {
|
||||
// Only committed versions can be read.
|
||||
ASSERT(snapshotVersion <= m_lastCommittedVersion);
|
||||
Reference<IPagerSnapshot> snapshot = m_pager->getReadSnapshot(snapshotVersion);
|
||||
|
||||
// This is a ref because snapshot will continue to hold the metakey value memory
|
||||
KeyRef m = snapshot->getMetaKey();
|
||||
|
||||
return cursor->init(this, snapshot, ((MetaKey*)m.begin())->root.get());
|
||||
}
|
||||
|
||||
// Cursor is for reading and interating over user visible KV pairs at a specific version
|
||||
// KeyValueRefs returned become invalid once the cursor is moved
|
||||
class Cursor : public IStoreCursor, public ReferenceCounted<Cursor>, public FastAllocated<Cursor>, NonCopyable {
|
||||
|
@ -5264,10 +5509,13 @@ public:
|
|||
|
||||
ACTOR static Future<Standalone<RangeResultRef>> readRange_impl(KeyValueStoreRedwoodUnversioned* self, KeyRange keys,
|
||||
int rowLimit, int byteLimit) {
|
||||
state VersionedBTree::BTreeCursor cur;
|
||||
wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion()));
|
||||
|
||||
wait(self->m_concurrentReads.take());
|
||||
state FlowLock::Releaser releaser(self->m_concurrentReads);
|
||||
|
||||
++g_redwoodMetrics.opGetRange;
|
||||
|
||||
state Standalone<RangeResultRef> result;
|
||||
state int accumulatedBytes = 0;
|
||||
ASSERT(byteLimit > 0);
|
||||
|
@ -5276,33 +5524,58 @@ public:
|
|||
return result;
|
||||
}
|
||||
|
||||
state Reference<IStoreCursor> cur = self->m_tree->readAtVersion(self->m_tree->getLastCommittedVersion());
|
||||
// Prefetch is currently only done in the forward direction
|
||||
state int prefetchBytes = rowLimit > 1 ? byteLimit : 0;
|
||||
// Prefetch is disabled for now pending some decent logic for deciding how much to fetch
|
||||
state int prefetchBytes = 0;
|
||||
|
||||
if (rowLimit > 0) {
|
||||
wait(cur->findFirstEqualOrGreater(keys.begin, prefetchBytes));
|
||||
while (cur->isValid() && cur->getKey() < keys.end) {
|
||||
KeyValueRef kv(KeyRef(result.arena(), cur->getKey()), ValueRef(result.arena(), cur->getValue()));
|
||||
wait(cur.seekGTE(keys.begin, prefetchBytes));
|
||||
while (cur.isValid()) {
|
||||
// Read page contents without using waits
|
||||
bool isRoot = cur.inRoot();
|
||||
BTreePage::BinaryTree::Cursor leafCursor = cur.popPath();
|
||||
while(leafCursor.valid()) {
|
||||
KeyValueRef kv = leafCursor.get().toKeyValueRef();
|
||||
if(kv.key >= keys.end) {
|
||||
break;
|
||||
}
|
||||
accumulatedBytes += kv.expectedSize();
|
||||
result.push_back(result.arena(), kv);
|
||||
result.push_back_deep(result.arena(), kv);
|
||||
if (--rowLimit == 0 || accumulatedBytes >= byteLimit) {
|
||||
break;
|
||||
}
|
||||
wait(cur->next());
|
||||
leafCursor.moveNext();
|
||||
}
|
||||
// Stop if the leaf cursor is still valid which means we hit a key or size limit or
|
||||
// if we started in the root page
|
||||
if(leafCursor.valid() || isRoot) {
|
||||
break;
|
||||
}
|
||||
wait(cur.moveNext());
|
||||
}
|
||||
} else {
|
||||
wait(cur->findLastLessOrEqual(keys.end));
|
||||
if (cur->isValid() && cur->getKey() == keys.end) wait(cur->prev());
|
||||
|
||||
while (cur->isValid() && cur->getKey() >= keys.begin) {
|
||||
KeyValueRef kv(KeyRef(result.arena(), cur->getKey()), ValueRef(result.arena(), cur->getValue()));
|
||||
wait(cur.seekLT(keys.end, prefetchBytes));
|
||||
while (cur.isValid()) {
|
||||
// Read page contents without using waits
|
||||
bool isRoot = cur.inRoot();
|
||||
BTreePage::BinaryTree::Cursor leafCursor = cur.popPath();
|
||||
while(leafCursor.valid()) {
|
||||
KeyValueRef kv = leafCursor.get().toKeyValueRef();
|
||||
if(kv.key < keys.begin) {
|
||||
break;
|
||||
}
|
||||
accumulatedBytes += kv.expectedSize();
|
||||
result.push_back(result.arena(), kv);
|
||||
result.push_back_deep(result.arena(), kv);
|
||||
if (++rowLimit == 0 || accumulatedBytes >= byteLimit) {
|
||||
break;
|
||||
}
|
||||
wait(cur->prev());
|
||||
leafCursor.movePrev();
|
||||
}
|
||||
// Stop if the leaf cursor is still valid which means we hit a key or size limit or
|
||||
// if we started in the root page
|
||||
if(leafCursor.valid() || isRoot) {
|
||||
break;
|
||||
}
|
||||
wait(cur.movePrev());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5316,15 +5589,16 @@ public:
|
|||
|
||||
ACTOR static Future<Optional<Value>> readValue_impl(KeyValueStoreRedwoodUnversioned* self, Key key,
|
||||
Optional<UID> debugID) {
|
||||
state VersionedBTree::BTreeCursor cur;
|
||||
wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion()));
|
||||
|
||||
wait(self->m_concurrentReads.take());
|
||||
state FlowLock::Releaser releaser(self->m_concurrentReads);
|
||||
|
||||
++g_redwoodMetrics.opGet;
|
||||
state Reference<IStoreCursor> cur = self->m_tree->readAtVersion(self->m_tree->getLastCommittedVersion());
|
||||
|
||||
wait(cur->findEqual(key));
|
||||
if (cur->isValid()) {
|
||||
return cur->getValue();
|
||||
wait(cur.seekGTE(key, 0));
|
||||
if (cur.isValid() && cur.get().key == key) {
|
||||
return cur.get().value.get();
|
||||
}
|
||||
return Optional<Value>();
|
||||
}
|
||||
|
@ -5335,18 +5609,20 @@ public:
|
|||
|
||||
ACTOR static Future<Optional<Value>> readValuePrefix_impl(KeyValueStoreRedwoodUnversioned* self, Key key,
|
||||
int maxLength, Optional<UID> debugID) {
|
||||
state VersionedBTree::BTreeCursor cur;
|
||||
wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion()));
|
||||
|
||||
wait(self->m_concurrentReads.take());
|
||||
state FlowLock::Releaser releaser(self->m_concurrentReads);
|
||||
|
||||
++g_redwoodMetrics.opGet;
|
||||
state Reference<IStoreCursor> cur = self->m_tree->readAtVersion(self->m_tree->getLastCommittedVersion());
|
||||
|
||||
wait(cur->findEqual(key));
|
||||
if (cur->isValid()) {
|
||||
Value v = cur->getValue();
|
||||
wait(cur.seekGTE(key, 0));
|
||||
if (cur.isValid() && cur.get().key == key) {
|
||||
Value v = cur.get().value.get();
|
||||
int len = std::min(v.size(), maxLength);
|
||||
return Value(cur->getValue().substr(0, len));
|
||||
return Value(v.substr(0, len));
|
||||
}
|
||||
|
||||
return Optional<Value>();
|
||||
}
|
||||
|
||||
|
@ -5411,6 +5687,157 @@ KeyValue randomKV(int maxKeySize = 10, int maxValueSize = 5) {
|
|||
return kv;
|
||||
}
|
||||
|
||||
// Verify a range using a BTreeCursor.
|
||||
// Assumes that the BTree holds a single data version and the version is 0.
|
||||
ACTOR Future<int> verifyRangeBTreeCursor(VersionedBTree* btree, Key start, Key end, Version v,
|
||||
std::map<std::pair<std::string, Version>, Optional<std::string>>* written,
|
||||
int* pErrorCount) {
|
||||
state int errors = 0;
|
||||
if (end <= start) end = keyAfter(start);
|
||||
|
||||
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator i =
|
||||
written->lower_bound(std::make_pair(start.toString(), 0));
|
||||
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator iEnd =
|
||||
written->upper_bound(std::make_pair(end.toString(), 0));
|
||||
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator iLast;
|
||||
|
||||
state VersionedBTree::BTreeCursor cur;
|
||||
wait(btree->initBTreeCursor(&cur, v));
|
||||
debug_printf("VerifyRange(@%" PRId64 ", %s, %s): Start\n", v, start.printable().c_str(), end.printable().c_str());
|
||||
|
||||
// Randomly use the cursor for something else first.
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
state Key randomKey = randomKV().key;
|
||||
debug_printf("VerifyRange(@%" PRId64 ", %s, %s): Dummy seek to '%s'\n", v, start.printable().c_str(),
|
||||
end.printable().c_str(), randomKey.toString().c_str());
|
||||
wait(success(cur.seek(randomKey, 0)));
|
||||
}
|
||||
|
||||
debug_printf("VerifyRange(@%" PRId64 ", %s, %s): Actual seek\n", v, start.printable().c_str(),
|
||||
end.printable().c_str());
|
||||
wait(cur.seekGTE(start, 0));
|
||||
|
||||
state std::vector<KeyValue> results;
|
||||
|
||||
while (cur.isValid() && cur.get().key < end) {
|
||||
// Find the next written kv pair that would be present at this version
|
||||
while (1) {
|
||||
iLast = i;
|
||||
if (i == iEnd) break;
|
||||
++i;
|
||||
|
||||
if (iLast->first.second <= v && iLast->second.present() &&
|
||||
(i == iEnd || i->first.first != iLast->first.first || i->first.second > v)) {
|
||||
debug_printf("VerifyRange(@%" PRId64 ", %s, %s) Found key in written map: %s\n", v,
|
||||
start.printable().c_str(), end.printable().c_str(), iLast->first.first.c_str());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (iLast == iEnd) {
|
||||
++errors;
|
||||
++*pErrorCount;
|
||||
printf("VerifyRange(@%" PRId64 ", %s, %s) ERROR: Tree key '%s' vs nothing in written map.\n", v,
|
||||
start.printable().c_str(), end.printable().c_str(), cur.get().key.toString().c_str());
|
||||
break;
|
||||
}
|
||||
|
||||
if (cur.get().key != iLast->first.first) {
|
||||
++errors;
|
||||
++*pErrorCount;
|
||||
printf("VerifyRange(@%" PRId64 ", %s, %s) ERROR: Tree key '%s' but expected '%s'\n", v,
|
||||
start.printable().c_str(), end.printable().c_str(), cur.get().key.toString().c_str(),
|
||||
iLast->first.first.c_str());
|
||||
break;
|
||||
}
|
||||
if (cur.get().value.get() != iLast->second.get()) {
|
||||
++errors;
|
||||
++*pErrorCount;
|
||||
printf("VerifyRange(@%" PRId64 ", %s, %s) ERROR: Tree key '%s' has tree value '%s' but expected '%s'\n", v,
|
||||
start.printable().c_str(), end.printable().c_str(), cur.get().key.toString().c_str(),
|
||||
cur.get().value.get().toString().c_str(), iLast->second.get().c_str());
|
||||
break;
|
||||
}
|
||||
|
||||
ASSERT(errors == 0);
|
||||
|
||||
results.push_back(KeyValue(KeyValueRef(cur.get().key, cur.get().value.get())));
|
||||
wait(cur.moveNext());
|
||||
}
|
||||
|
||||
// Make sure there are no further written kv pairs that would be present at this version.
|
||||
while (1) {
|
||||
iLast = i;
|
||||
if (i == iEnd) break;
|
||||
++i;
|
||||
if (iLast->first.second <= v && iLast->second.present() &&
|
||||
(i == iEnd || i->first.first != iLast->first.first || i->first.second > v))
|
||||
break;
|
||||
}
|
||||
|
||||
if (iLast != iEnd) {
|
||||
++errors;
|
||||
++*pErrorCount;
|
||||
printf("VerifyRange(@%" PRId64 ", %s, %s) ERROR: Tree range ended but written has @%" PRId64 " '%s'\n", v,
|
||||
start.printable().c_str(), end.printable().c_str(), iLast->first.second, iLast->first.first.c_str());
|
||||
}
|
||||
|
||||
debug_printf("VerifyRangeReverse(@%" PRId64 ", %s, %s): start\n", v, start.printable().c_str(),
|
||||
end.printable().c_str());
|
||||
|
||||
// Randomly use a new cursor at the same version for the reverse range read, if the version is still available for
|
||||
// opening new cursors
|
||||
if (v >= btree->getOldestVersion() && deterministicRandom()->coinflip()) {
|
||||
cur = VersionedBTree::BTreeCursor();
|
||||
wait(btree->initBTreeCursor(&cur, v));
|
||||
}
|
||||
|
||||
// Now read the range from the tree in reverse order and compare to the saved results
|
||||
wait(cur.seekLT(end, 0));
|
||||
|
||||
state std::vector<KeyValue>::const_reverse_iterator r = results.rbegin();
|
||||
|
||||
while (cur.isValid() && cur.get().key >= start) {
|
||||
if (r == results.rend()) {
|
||||
++errors;
|
||||
++*pErrorCount;
|
||||
printf("VerifyRangeReverse(@%" PRId64 ", %s, %s) ERROR: Tree key '%s' vs nothing in written map.\n", v,
|
||||
start.printable().c_str(), end.printable().c_str(), cur.get().key.toString().c_str());
|
||||
break;
|
||||
}
|
||||
|
||||
if (cur.get().key != r->key) {
|
||||
++errors;
|
||||
++*pErrorCount;
|
||||
printf("VerifyRangeReverse(@%" PRId64 ", %s, %s) ERROR: Tree key '%s' but expected '%s'\n", v,
|
||||
start.printable().c_str(), end.printable().c_str(), cur.get().key.toString().c_str(),
|
||||
r->key.toString().c_str());
|
||||
break;
|
||||
}
|
||||
if (cur.get().value.get() != r->value) {
|
||||
++errors;
|
||||
++*pErrorCount;
|
||||
printf("VerifyRangeReverse(@%" PRId64
|
||||
", %s, %s) ERROR: Tree key '%s' has tree value '%s' but expected '%s'\n",
|
||||
v, start.printable().c_str(), end.printable().c_str(), cur.get().key.toString().c_str(),
|
||||
cur.get().value.get().toString().c_str(), r->value.toString().c_str());
|
||||
break;
|
||||
}
|
||||
|
||||
++r;
|
||||
wait(cur.movePrev());
|
||||
}
|
||||
|
||||
if (r != results.rend()) {
|
||||
++errors;
|
||||
++*pErrorCount;
|
||||
printf("VerifyRangeReverse(@%" PRId64 ", %s, %s) ERROR: Tree range ended but written has '%s'\n", v,
|
||||
start.printable().c_str(), end.printable().c_str(), r->key.toString().c_str());
|
||||
}
|
||||
|
||||
return errors;
|
||||
}
|
||||
|
||||
ACTOR Future<int> verifyRange(VersionedBTree* btree, Key start, Key end, Version v,
|
||||
std::map<std::pair<std::string, Version>, Optional<std::string>>* written,
|
||||
int* pErrorCount) {
|
||||
|
@ -5607,6 +6034,58 @@ ACTOR Future<int> seekAll(VersionedBTree* btree, Version v,
|
|||
return errors;
|
||||
}
|
||||
|
||||
// Verify the result of point reads for every set or cleared key at the given version
|
||||
ACTOR Future<int> seekAllBTreeCursor(VersionedBTree* btree, Version v,
|
||||
std::map<std::pair<std::string, Version>, Optional<std::string>>* written, int* pErrorCount) {
|
||||
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator i = written->cbegin();
|
||||
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator iEnd = written->cend();
|
||||
state int errors = 0;
|
||||
state VersionedBTree::BTreeCursor cur;
|
||||
|
||||
wait(btree->initBTreeCursor(&cur, v));
|
||||
|
||||
while (i != iEnd) {
|
||||
state std::string key = i->first.first;
|
||||
state Version ver = i->first.second;
|
||||
if (ver == v) {
|
||||
state Optional<std::string> val = i->second;
|
||||
debug_printf("Verifying @%" PRId64 " '%s'\n", ver, key.c_str());
|
||||
state Arena arena;
|
||||
wait(cur.seekGTE(RedwoodRecordRef(KeyRef(arena, key), 0), 0));
|
||||
bool foundKey = cur.isValid() && cur.get().key == key;
|
||||
bool hasValue = foundKey && cur.get().value.present();
|
||||
|
||||
if (val.present()) {
|
||||
bool valueMatch = hasValue && cur.get().value.get() == val.get();
|
||||
if (!foundKey || !hasValue || !valueMatch) {
|
||||
++errors;
|
||||
++*pErrorCount;
|
||||
if (!foundKey) {
|
||||
printf("Verify ERROR: key_not_found: '%s' -> '%s' @%" PRId64 "\n", key.c_str(),
|
||||
val.get().c_str(), ver);
|
||||
}
|
||||
else if (!hasValue) {
|
||||
printf("Verify ERROR: value_not_found: '%s' -> '%s' @%" PRId64 "\n", key.c_str(),
|
||||
val.get().c_str(), ver);
|
||||
}
|
||||
else if (!valueMatch) {
|
||||
printf("Verify ERROR: value_incorrect: for '%s' found '%s' expected '%s' @%" PRId64 "\n",
|
||||
key.c_str(), cur.get().value.get().toString().c_str(), val.get().c_str(),
|
||||
ver);
|
||||
}
|
||||
}
|
||||
} else if (foundKey && hasValue) {
|
||||
++errors;
|
||||
++*pErrorCount;
|
||||
printf("Verify ERROR: cleared_key_found: '%s' -> '%s' @%" PRId64 "\n", key.c_str(),
|
||||
cur.get().value.get().toString().c_str(), ver);
|
||||
}
|
||||
}
|
||||
++i;
|
||||
}
|
||||
return errors;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> verify(VersionedBTree* btree, FutureStream<Version> vStream,
|
||||
std::map<std::pair<std::string, Version>, Optional<std::string>>* written, int* pErrorCount,
|
||||
bool serial) {
|
||||
|
@ -5637,7 +6116,13 @@ ACTOR Future<Void> verify(VersionedBTree* btree, FutureStream<Version> vStream,
|
|||
state Reference<IStoreCursor> cur = btree->readAtVersion(v);
|
||||
|
||||
debug_printf("Verifying entire key range at version %" PRId64 "\n", v);
|
||||
fRangeAll = verifyRange(btree, LiteralStringRef(""), LiteralStringRef("\xff\xff"), v, written, pErrorCount);
|
||||
if(deterministicRandom()->coinflip()) {
|
||||
fRangeAll = verifyRange(btree, LiteralStringRef(""), LiteralStringRef("\xff\xff"), v, written,
|
||||
pErrorCount);
|
||||
} else {
|
||||
fRangeAll = verifyRangeBTreeCursor(btree, LiteralStringRef(""), LiteralStringRef("\xff\xff"), v, written,
|
||||
pErrorCount);
|
||||
}
|
||||
if (serial) {
|
||||
wait(success(fRangeAll));
|
||||
}
|
||||
|
@ -5646,13 +6131,21 @@ ACTOR Future<Void> verify(VersionedBTree* btree, FutureStream<Version> vStream,
|
|||
Key end = randomKV().key;
|
||||
debug_printf("Verifying range (%s, %s) at version %" PRId64 "\n", toString(begin).c_str(),
|
||||
toString(end).c_str(), v);
|
||||
if(deterministicRandom()->coinflip()) {
|
||||
fRangeRandom = verifyRange(btree, begin, end, v, written, pErrorCount);
|
||||
} else {
|
||||
fRangeRandom = verifyRangeBTreeCursor(btree, begin, end, v, written, pErrorCount);
|
||||
}
|
||||
if (serial) {
|
||||
wait(success(fRangeRandom));
|
||||
}
|
||||
|
||||
debug_printf("Verifying seeks to each changed key at version %" PRId64 "\n", v);
|
||||
if(deterministicRandom()->coinflip()) {
|
||||
fSeekAll = seekAll(btree, v, written, pErrorCount);
|
||||
} else {
|
||||
fSeekAll = seekAllBTreeCursor(btree, v, written, pErrorCount);
|
||||
}
|
||||
if (serial) {
|
||||
wait(success(fSeekAll));
|
||||
}
|
||||
|
@ -6485,11 +6978,11 @@ TEST_CASE("!/redwood/correctness/btree") {
|
|||
state int maxKeySize = deterministicRandom()->randomInt(1, pageSize * 2);
|
||||
state int maxValueSize = randomSize(pageSize * 25);
|
||||
state int maxCommitSize = shortTest ? 1000 : randomSize(std::min<int>((maxKeySize + maxValueSize) * 20000, 10e6));
|
||||
state int mutationBytesTarget = shortTest ? 100000 : randomSize(std::min<int>(maxCommitSize * 100, 100e6));
|
||||
state int mutationBytesTarget = shortTest ? 100000 : randomSize(std::min<int>(maxCommitSize * 100, pageSize * 100000));
|
||||
state double clearProbability = deterministicRandom()->random01() * .1;
|
||||
state double clearSingleKeyProbability = deterministicRandom()->random01();
|
||||
state double clearPostSetProbability = deterministicRandom()->random01() * .1;
|
||||
state double coldStartProbability = pagerMemoryOnly ? 0 : deterministicRandom()->random01();
|
||||
state double coldStartProbability = pagerMemoryOnly ? 0 : (deterministicRandom()->random01() * 0.3);
|
||||
state double advanceOldVersionProbability = deterministicRandom()->random01();
|
||||
state double maxDuration = 60;
|
||||
state int64_t cacheSizeBytes =
|
||||
|
|
Loading…
Reference in New Issue