Forward range reads are done.

This commit is contained in:
Stephen Atherton 2017-09-15 17:27:13 -07:00
parent 125d8168b4
commit 15622f026e
2 changed files with 82 additions and 100 deletions

View File

@ -29,13 +29,14 @@
//#define REDWOOD_DEBUG 1
#ifdef REDWOOD_DEBUG
#define debug_printf(args...) fprintf(stderr, args)
#define debug_printf_always(args...) fprintf(stdout, args)
#if REDWOOD_DEBUG
#define debug_printf debug_printf_always
#else
#define debug_printf(...)
#endif
#define debug_printf_always(args...) fprintf(stderr, args)
typedef uint32_t LogicalPageID; // uint64_t?

View File

@ -623,13 +623,13 @@ private:
// TODO: If a mutation set is equal to the previous existing value of the key, don't write it.
// Output mutations for the mutation boundary start key
while(iMutations != iMutationsEnd) {
if(iMutations->first < minVersion || minVersion == invalidVersion)
minVersion = iMutations->first;
const SingleKeyMutation &m = iMutations->second;
// ( (page_size - map_overhead) / min_kvpairs_per_leaf ) - kvpair_overhead_est - keybytes
int maxPartSize = ((self->m_pageSize - 1 - 4) / 3) - 21 - iMutationBoundary->first.size();
ASSERT(maxPartSize > 0);
if(m.isClear() || m.value.size() < maxPartSize) {
if(iMutations->first < minVersion || minVersion == invalidVersion)
minVersion = iMutations->first;
merged.push_back(iMutations->second.toKVV(iMutationBoundary->first, iMutations->first).pack());
debug_printf("%s: Added %s [mutation, boundary start]\n", printPrefix.c_str(), KeyVersionValue::unpack(merged.back()).toString().c_str());
}
@ -640,6 +640,8 @@ private:
while(bytesLeft > 0) {
int partSize = std::min(bytesLeft, maxPartSize);
kvv.value = Key(m.value.substr(kvv.valueIndex, partSize), m.value.arena());
if(iMutations->first < minVersion || minVersion == invalidVersion)
minVersion = iMutations->first;
merged.push_back(kvv.pack());
debug_printf("%s: Added %s [mutation, boundary start]\n", printPrefix.c_str(), KeyVersionValue::unpack(merged.back()).toString().c_str());
kvv.valueIndex += partSize;
@ -702,7 +704,6 @@ private:
return VersionedChildrenT({ {0,{{lowerBoundKey,root}}} });
}
// TODO: Make version and key splits based on contents of merged list
IPager *pager = self->m_pager;
@ -857,7 +858,7 @@ private:
logicalPages.push_back( self->m_pager->allocateLogicalPage() );
// Write each page using its assigned page ID
debug_printf("%s Writing internal pages\n", printPrefix.c_str());
debug_printf("%s Writing %d internal pages\n", printPrefix.c_str(), pages.size());
for(int i=0; i<pages.size(); i++)
self->writePage( logicalPages[i], pages[i].second, version );
@ -917,6 +918,7 @@ private:
// InternalCursor is for seeking to and iterating over the low level records in the tree
// which combine in groups of 1 or more to represent user visible KV pairs.
// TODO: InternalCursor does not need to be reference counted.
class InternalCursor : public ReferenceCounted<InternalCursor> {
public:
InternalCursor() {}
@ -931,6 +933,12 @@ private:
return c;
}
void swap(InternalCursor &o) {
m_path.swap(o.m_path);
std::swap(kvv, o.kvv);
std::swap(m_pages, o.m_pages);
}
bool valid() const {
return kvv.valid();
}
@ -1161,44 +1169,7 @@ private:
}
if(cmp > 0) {
debug_printf("find: GTE path\n");
// Move forward record by record to find the next record which is the latest version
// of its key at or before m_version and is present. To determine that we've reached
// such a record we have to go one record "too far".
while(1) {
// While version is greater than the target or the value is not present, move forward
while(icur->valid() && (icur->kvv.version > self->m_version || !icur->kvv.value.present()))
Void _ = wait(icur->move(true));
// If we reached the end of the database, we're done.
if(!icur->kvv.valid()) {
self->m_kv = Optional<KeyValueRef>();
return Void();
}
// Now we are at a record with a version we might be able to return. First,
// we must move forward until either the end of the tree, a version too new,
// or a different key is found.
while(icur->valid() && icur->kvv.version <= self->m_version && icur->kvv.key == icur->kvv.key) {
Void _ = wait(icur->move(true));
}
// Save icur as inext as that's where we'll start reading for the next record
state Reference<InternalCursor> inext = icur->clone();
// Now we have to back up one record
Void _ = wait(icur->move(false));
ASSERT(icur->valid());
// If the value exists, return it.
if(icur->kvv.value.present()) {
Void _ = wait(self->readFullKVPair(self));
icur = inext;
inext.clear();
return Void();
}
}
Void _ = wait(self->next(true)); // TODO: Have a needValue argument here
}
if(cmp < 0) {
@ -1210,42 +1181,34 @@ private:
// Precondition: The internal cursor should be just past the last returned/found record.
ACTOR static Future<Void> next_impl(Reference<Cursor> self, bool needValue) {
state Reference<InternalCursor> &icur = self->m_icursor;
state Reference<InternalCursor> &i = self->m_icursor;
// Move forward record by record to find the next record which is the latest version
// of its key at or before m_version and is present. To determine that we've reached
// such a record we have to go one record "too far".
debug_printf("next(): cursor %s\n", i->toString().c_str());
state Version v = self->m_pagerSnapshot->getVersion();
state Reference<InternalCursor> iLast;
while(1) {
// While version is greater than the target or the value is not present, move forward
while(icur->valid() && (icur->kvv.version > self->m_version || !icur->kvv.value.present()))
Void _ = wait(icur->move(true));
// If we reached the end of the database, we're done.
if(!icur->kvv.valid()) {
self->m_kv = Optional<KeyValueRef>();
return Void();
}
// Now we are at a record with a version we might be able to return. First,
// we must move forward until either the end of the tree, a version too new,
// or a different key is found.
while(icur->valid() && icur->kvv.version <= self->m_version && icur->kvv.key == icur->kvv.key) {
Void _ = wait(icur->move(true));
}
// Save icur as inext as that's where we'll start reading for the next record
state Reference<InternalCursor> inext = icur->clone();
// Now we have to back up one record
Void _ = wait(icur->move(false));
// If the value exists, return it.
if(icur->kvv.value.present()) {
Void _ = wait(self->readFullKVPair(self));
icur = inext;
iLast = i->clone();
if(!i->valid())
break;
Void _ = wait(i->move(true));
if(iLast->kvv.version <= v
&& iLast->kvv.value.present()
&& (
!i->valid()
|| i->kvv.key != iLast->kvv.key
|| i->kvv.version > v
)
) {
std::swap(i, iLast);
Void _ = wait(readFullKVPair(self));
std::swap(i, iLast);
return Void();
}
}
self->m_kv = Optional<KeyValueRef>();
return Void();
}
// Read all of the current value, if it is split across multiple kv pairs, and set m_kv.
@ -1324,60 +1287,78 @@ KeyValue randomKV(int keySize = 10, int valueSize = 5) {
ACTOR Future<int> randomRangeVerify(VersionedBTree *btree, Version v, std::map<std::pair<std::string, Version>, Optional<std::string>> *written) {
state int errors = 0;
state Key start = LiteralStringRef("a");//randomKV().key;
state Key end = LiteralStringRef("z"); //randomKV().key;
state Key start = randomKV().key;
state Key end = randomKV().key;
if(end <= start)
end = keyAfter(start);
printf("Range read %s to %s @%lld\n", start.toString().c_str(), end.toString().c_str(), v);
debug_printf("VerifyRange %s to %s @%lld\n", start.toString().c_str(), end.toString().c_str(), v);
state Reference<IStoreCursor> cur = btree->readAtVersion(v);
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator i = written->lower_bound(std::make_pair(start.toString(), v));
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator iEnd = written->upper_bound(std::make_pair(end.toString(), 0));
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator iLast;
Void _ = wait(cur->findFirstEqualOrGreater(start, 0));
while(cur->isValid() && cur->getKey() < end) {
debug_printf("(%s, %s) %s -> %d\n", start.toString().c_str(), end.toString().c_str(), cur->getKey().toString().c_str(), cur->getValue().size());
// Find the next written kv pair that would be present at this version
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator iLast = i;
while(1) {
while(i != iEnd) {
printf("written: ver %lld key %s val %d bytes\n",
i->first.second,
i->first.first.c_str(),
i->second.present() ? i->second.get().size() : -1);
++i;
if(i == iEnd)
break;
if(i->first.first != iLast->first.first)
break;
if(i->first.second > v)
break;
iLast = i;
}
if(iLast->first.second <= v && iLast->second.present())
break;
iLast = i;
if(i == iEnd)
break;
++i;
if(iLast->first.second <= v
&& iLast->second.present()
&& (
i == iEnd
|| i->first.first != iLast->first.first
|| i->first.second > v
)
)
break;
}
if(iLast == iEnd) {
errors += 1;
printf("Range check: Tree key '%s' vs nothing in written map.\n", cur->getKey().toString().c_str());
printf("VerifyRange(@%lld, %s, %s) failed: Tree key '%s' vs nothing in written map.\n", v, start.toString().c_str(), end.toString().c_str(), cur->getKey().toString().c_str());
break;
}
if(cur->getKey() != iLast->first.first) {
errors += 1;
printf("Range check: Tree key '%s' vs written '%s'\n", cur->getKey().toString().c_str(), iLast->first.first.c_str());
printf("VerifyRange(@%lld, %s, %s) failed: Tree key '%s' vs written '%s'\n", v, start.toString().c_str(), end.toString().c_str(), cur->getKey().toString().c_str(), iLast->first.first.c_str());
break;
}
if(cur->getValue() != iLast->second.get()) {
errors += 1;
printf("Range check: Tree key '%s' has tree value '%s' vs written '%s'\n", cur->getKey().toString().c_str(), cur->getValue().toString().c_str(), iLast->second.get().c_str());
printf("VerifyRange(@%lld, %s, %s) failed: Tree key '%s' has tree value '%s' vs written '%s'\n", v, start.toString().c_str(), end.toString().c_str(), cur->getKey().toString().c_str(), cur->getValue().toString().c_str(), iLast->second.get().c_str());
break;
}
Void _ = wait(cur->next(true));
}
// Make sure there are no further written kv pairs that would be present at this version.
while(1) {
iLast = i;
if(i == iEnd)
break;
++i;
if(iLast->first.second <= v
&& iLast->second.present()
&& (
i == iEnd
|| i->first.first != iLast->first.first
|| i->first.second > v
)
)
break;
}
if(iLast != iEnd) {
errors += 1;
printf("VerifyRange(@%lld, %s, %s) failed: Tree range ended but written has @%lld '%s'\n", v, start.toString().c_str(), end.toString().c_str(), iLast->first.second, iLast->first.first.c_str());
}
return errors;
}