Iterating over and coalescing the “internal” records (not user visible, tuple of (key, version, valueIndex, value)) now works forward and backward, and findEqual() now works using this to read split values fully. This is also most of the work needed for range reads.

This commit is contained in:
Stephen Atherton 2017-09-09 01:29:25 -07:00
parent b5f79d27f7
commit 919a3d1740
2 changed files with 142 additions and 21 deletions

View File

@ -536,6 +536,7 @@ ACTOR Future<Reference<const IPage>> getPageImpl(IndirectShadowPager *pager, Ref
Void _ = wait(pager->writeActors.signalAndCollapse());
state uint8_t *buf = new (snapshot->arena) uint8_t[IndirectShadowPage::PAGE_BYTES];
// TODO: Use readZeroCopy but fall back ton read(). Releasing pages should releaseZeroCopy for successful zero copy reads
int read = wait(pager->dataFile->read(buf, IndirectShadowPage::PAGE_BYTES, physicalPageID * IndirectShadowPage::PAGE_BYTES));
ASSERT(read == IndirectShadowPage::PAGE_BYTES);

View File

@ -51,7 +51,7 @@ struct SimpleFixedSizeMapRef {
};
// Returns -1 if key is less than first key, otherwise index into entries
int findLastLessOrEqual(StringRef key) {
int findLastLessOrEqual(StringRef key) const {
return std::upper_bound(entries.begin(), entries.end(),
key,
[](StringRef const& a, KVPairsT::value_type const& b) { return a < b.key; })
@ -925,8 +925,10 @@ private:
}
virtual ~Cursor() {}
virtual Future<Void> findEqual(KeyRef key) { return find_impl(Reference<Cursor>::addRef(this), key, 0); }
virtual Future<Void> findFirstGreaterOrEqual(KeyRef key, int prefetchNextBytes) NOT_IMPLEMENTED
virtual Future<Void> findLastLessOrEqual(KeyRef key, int prefetchPriorBytes) NOT_IMPLEMENTED
virtual Future<Void> next(bool needValue) NOT_IMPLEMENTED
virtual Future<Void> prev(bool needValue) NOT_IMPLEMENTED
@ -948,14 +950,78 @@ private:
Version m_version;
Reference<IPagerSnapshot> m_pager;
Optional<KeyValueRef> m_kv;
Optional<KeyValueRef> m_kv; // The current user-level key/value in the tree
Arena m_arena;
LogicalPageID m_root;
KeyVersionValue m_current; // The current internal record in the tree
struct PageEntryLocation {
PageEntryLocation() : index(-1) {}
Reference<const IPage> page;
FixedSizeMap map;
int index;
};
std::vector<PageEntryLocation> m_path;
void addref() { ReferenceCounted<Cursor>::addref(); }
void delref() { ReferenceCounted<Cursor>::delref(); }
ACTOR static Future<Void> findEqual_impl(Reference<Cursor> self, KeyRef key) {
// Move one 'internal' key/value/version/valueindex/value record.
// Iterating with this funciton will "see" all parts of all values and clears at all versions.
ACTOR static Future<Void> moveInternal(Reference<Cursor> self, bool fwd) {
state std::vector<PageEntryLocation> &path = self->m_path;
state KeyVersionValue &kvv = self->m_current;
// Try to move to the next index at the bottom of path.
// If the next index is out of bounds, pop last path location and try again.
while(!path.empty()) {
PageEntryLocation &loc = path.back();
if(fwd)
++loc.index;
else
--loc.index;
// If we've moved too far then pop the last path page, otherwise we're done
// retracing the path.
if(fwd ? (loc.index >= loc.map.entries.size()) : (loc.index < 0)) {
path.pop_back();
}
else
break;
}
// If the path is empty then we've reached an end of the database.
if(path.empty()) {
kvv = KeyVersionValue();
return Void();
}
// Now, traverse downward to a leaf.
while(!(path.back().map.flags & EPageFlags::IS_LEAF)) {
PageEntryLocation &last = path.back();
state LogicalPageID pageNumber = *(uint32_t *)last.map.entries[last.index].value.begin();
debug_printf("moveInternal(%d): Reading page %d\n", fwd, pageNumber);
Reference<const IPage> rawPage = wait(self->m_pager->getPhysicalPage(pageNumber));
path.emplace_back();
PageEntryLocation &next = path.back();
next.page = rawPage;
next.map = FixedSizeMap::decode(StringRef(rawPage->begin(), rawPage->size()));
debug_printf("Read page %d @%lld: %s\n", pageNumber, self->m_version, next.map.toString().c_str());
next.index = fwd ? 0 : next.map.entries.size() - 1;
}
PageEntryLocation &cur = path.back();
kvv = KeyVersionValue::unpack(cur.map.entries[cur.index]);
return Void();
}
// find key in tree closest to or equal to key.
// for less than or equal use cmp < 0
// for greater than or equal use cmp > 0
// for equal use cmp == 0
ACTOR static Future<Void> find_impl(Reference<Cursor> self, KeyRef key, int cmp) {
state std::vector<PageEntryLocation> &path = self->m_path;
state LogicalPageID pageNumber = self->m_root;
state KeyVersionValue target(key, self->m_version, 0);
@ -964,39 +1030,93 @@ private:
loop {
debug_printf("findEqual: Reading page %d\n", pageNumber);
Reference<const IPage> rawPage = wait(self->m_pager->getPhysicalPage(pageNumber));
FixedSizeMap map = FixedSizeMap::decode(StringRef(rawPage->begin(), rawPage->size()));
debug_printf("Read page %d @%lld: %s\n", pageNumber, self->m_version, map.toString().c_str());
path.emplace_back();
PageEntryLocation &loc = path.back();
loc.page = rawPage;
loc.map = FixedSizeMap::decode(StringRef(rawPage->begin(), rawPage->size()));
debug_printf("Read page %d @%lld: %s\n", pageNumber, self->m_version, loc.map.toString().c_str());
// Special case of empty page (which should only happen for root)
if(map.entries.empty()) {
if(loc.map.entries.empty()) {
ASSERT(pageNumber == self->m_root);
self->m_kv = Optional<KeyValueRef>();
return Void();
}
if(map.flags && EPageFlags::IS_LEAF) {
int i = map.findLastLessOrEqual(targetPacked);
if(i >= 0) {
KeyVersionValue kvv = KeyVersionValue::unpack(map.entries[i]);
if(key == kvv.key && kvv.value.present()) {
self->m_kv = KeyValueRef(StringRef(self->m_arena, key), StringRef(self->m_arena, kvv.value.get()));
return Void();
if(loc.map.flags & EPageFlags::IS_LEAF) {
if(cmp == 0) {
loc.index = loc.map.findLastLessOrEqual(targetPacked);
if(loc.index >= 0) {
self->m_current = KeyVersionValue::unpack(loc.map.entries[loc.index]);
if(key == self->m_current.key && self->m_current.value.present()) {
Void _ = wait(self->readFullKVPair(self));
return Void();
}
}
}
self->m_kv = Optional<KeyValueRef>();
return Void();
self->m_kv = Optional<KeyValueRef>();
return Void();
}
ASSERT(false);
}
else {
int i = map.findLastLessOrEqual(targetPacked);
i = std::max(i, 0);
pageNumber = *(uint32_t *)map.entries[i].value.begin();
loc.index = loc.map.findLastLessOrEqual(targetPacked);
loc.index = std::max(loc.index, 0);
pageNumber = *(uint32_t *)loc.map.entries[loc.index].value.begin();
}
}
}
virtual Future<Void> findEqual(KeyRef key) {
return findEqual_impl(Reference<Cursor>::addRef(this), key);
// Read all of the current value, if it is split across multiple kv pairs, and set m_kv.
// m_current must be at either the first or the last value part.
ACTOR static Future<Void> readFullKVPair(Reference<Cursor> self) {
state KeyVersionValue &kvv = self->m_current;
// Initialize the optional kv to a KeyValueRef and get a reference to it.
state KeyValueRef &kv = (self->m_kv = KeyValueRef()).get();
kv.key = KeyRef(self->m_arena, kvv.key);
state Version valueVersion = kvv.version;
// Unsplit value
if(kvv.valueIndex == -1) {
ASSERT(kvv.value.present());
kv.value = ValueRef(self->m_arena, kvv.value.get());
return Void();
}
// If we find a 0 valueindex then we're at the first segment and must iterate forward
if(kvv.valueIndex == 0) {
// TODO: Performance, lots of extra copying is happening.
state std::vector<Value> parts;
state int size = 0;
while(1) {
ASSERT(kvv.value.present());
parts.push_back(kvv.value.get());
size += kvv.value.get().size();
Void _ = wait(self->moveInternal(self, true));
if(!kvv.valid() || kvv.version != valueVersion || kvv.key != kv.key)
break;
}
kv.value = makeString(size, self->m_arena);
uint8_t *wptr = mutateString(kv.value);
for(const Value &part : parts) {
memcpy(wptr, part.begin(), part.size());
wptr += part.size();
}
return Void();
}
// If we find any other valueIndex we must assume we're at the last segment and must iterate backward.
ASSERT(kvv.value.present());
kv.value = makeString(kvv.valueIndex + kvv.value.get().size());
while(1) {
ASSERT(kvv.value.present());
memcpy(mutateString(kv.value) + kvv.valueIndex, kvv.value.get().begin(), kvv.value.get().size());
Void _ = wait(self->moveInternal(self, false));
if(!kvv.valid() || kvv.key != kv.key)
break;
}
return Void();
}
};
};