Some cleanup. InternalCursor does not need to be reference counted.

This commit is contained in:
Stephen Atherton 2017-09-16 02:09:09 -07:00
parent c89b45158a
commit 1aae32dc16
1 changed files with 31 additions and 47 deletions

View File

@ -919,36 +919,23 @@ private:
// InternalCursor is for seeking to and iterating over the low level records in the tree
// which combine in groups of 1 or more to represent user visible KV pairs.
// TODO: InternalCursor does not need to be reference counted.
class InternalCursor : public ReferenceCounted<InternalCursor> {
class InternalCursor {
public:
InternalCursor() {}
InternalCursor(Reference<IPagerSnapshot> pages, LogicalPageID root) : m_pages(pages), m_root(root) {
m_path.reserve(6);
}
Reference<InternalCursor> clone() const {
Reference<InternalCursor> c(new InternalCursor(m_pages, m_root));
c->m_path = m_path;
c->kvv = kvv;
return c;
}
void swap(InternalCursor &o) {
m_path.swap(o.m_path);
std::swap(kvv, o.kvv);
std::swap(m_pages, o.m_pages);
}
bool valid() const {
return kvv.valid();
}
Future<Void> seekLessThanOrEqual(Key key) {
return seekLessThanOrEqual_impl(Reference<InternalCursor>::addRef(this), key);
return seekLessThanOrEqual_impl(this, key);
}
Future<Void> move(bool fwd) {
return move_impl(Reference<InternalCursor>::addRef(this), fwd);
return move_impl(this, fwd);
}
KeyVersionValue kvv; // The decoded current internal record in the tree
@ -957,9 +944,6 @@ private:
return format("cursor(0x%p) %s %s", this, pathToString().c_str(), valid() ? kvv.toString().c_str() : "<>");
}
void addref() { ReferenceCounted<InternalCursor>::addref(); }
void delref() { ReferenceCounted<InternalCursor>::delref(); }
private:
Reference<IPagerSnapshot> m_pages;
LogicalPageID m_root;
@ -978,14 +962,14 @@ private:
typedef std::vector<PageEntryLocation> TraversalPathT;
TraversalPathT m_path;
ACTOR static Future<Void> pushPage(Reference<InternalCursor> self, LogicalPageID id) {
ACTOR static Future<Void> pushPage(InternalCursor *self, LogicalPageID id) {
Reference<const IPage> rawPage = wait(self->m_pages->getPhysicalPage(id));
self->m_path.emplace_back(rawPage, id);
//debug_printf("pushPage(): Read page %d @%lld:\n%s\n", id, self->m_pages->getVersion(), self->m_path.back().map.toString(false, id, self->m_pages->getVersion()).c_str());
return Void();
}
ACTOR static Future<Void> reset(Reference<InternalCursor> self) {
ACTOR static Future<Void> reset(InternalCursor *self) {
if(self->m_path.empty()) {
Void _ = wait(pushPage(self, self->m_root));
}
@ -995,7 +979,7 @@ private:
return Void();
}
ACTOR static Future<Void> seekLessThanOrEqual_impl(Reference<InternalCursor> self, Key key) {
ACTOR static Future<Void> seekLessThanOrEqual_impl(InternalCursor *self, Key key) {
state TraversalPathT &path = self->m_path;
Void _ = wait(reset(self));
@ -1044,7 +1028,7 @@ private:
// Move one 'internal' key/value/version/valueindex/value record.
// Iterating with this funciton will "see" all parts of all values and clears at all versions.
ACTOR static Future<Void> move_impl(Reference<InternalCursor> self, bool fwd) {
ACTOR static Future<Void> move_impl(InternalCursor *self, bool fwd) {
state TraversalPathT &path = self->m_path;
state const char *dir = fwd ? "forward" : "backward";
@ -1095,7 +1079,7 @@ private:
class Cursor : public IStoreCursor, public ReferenceCounted<Cursor> {
public:
Cursor(Version version, IPager *pager, LogicalPageID root)
: m_version(version), m_pagerSnapshot(pager->getReadSnapshot(version)), m_icursor(new InternalCursor(m_pagerSnapshot, root)) {
: m_version(version), m_pagerSnapshot(pager->getReadSnapshot(version)), m_icursor(m_pagerSnapshot, root) {
}
virtual ~Cursor() {}
@ -1128,7 +1112,7 @@ private:
private:
Version m_version;
Reference<IPagerSnapshot> m_pagerSnapshot;
Reference<InternalCursor> m_icursor;
InternalCursor m_icursor;
Optional<KeyValueRef> m_kv; // The current user-level key/value in the tree
Arena m_arena;
@ -1137,17 +1121,17 @@ private:
// for greater than or equal use cmp > 0
// for equal use cmp == 0
ACTOR static Future<Void> find_impl(Reference<Cursor> self, KeyRef key, int cmp) {
state Reference<InternalCursor> &icur = self->m_icursor;
state InternalCursor &icur = self->m_icursor;
// Search for the last key at or before (key, version, max_value_index)
state KeyVersionValue target(key, self->m_version, std::numeric_limits<int>::max());
state Key record = target.pack().key;
Void _ = wait(icur->seekLessThanOrEqual(record));
Void _ = wait(icur.seekLessThanOrEqual(record));
// If we found the target key, return it as it is valid for any cmp option
if(icur->valid() && icur->kvv.value.present() && icur->kvv.key == key) {
debug_printf("Reading full kv pair starting from: %s\n", icur->kvv.toString().c_str());
if(icur.valid() && icur.kvv.value.present() && icur.kvv.key == key) {
debug_printf("Reading full kv pair starting from: %s\n", icur.kvv.toString().c_str());
Void _ = wait(self->readFullKVPair(self));
return Void();
}
@ -1162,8 +1146,8 @@ private:
if(cmp > 0) {
// icur is at a record < key, possibly before the start of the tree so move forward at least once.
loop {
Void _ = wait(icur->move(true));
if(!icur->valid() || icur->kvv.key > key)
Void _ = wait(icur.move(true));
if(!icur.valid() || icur.kvv.key > key)
break;
}
// Get the next present key at the target version. Handles invalid cursor too.
@ -1177,30 +1161,30 @@ private:
}
ACTOR static Future<Void> next_impl(Reference<Cursor> self, bool needValue) {
state Reference<InternalCursor> &i = self->m_icursor;
state InternalCursor &i = self->m_icursor;
debug_printf("next(): cursor %s\n", i->toString().c_str());
debug_printf("next(): cursor %s\n", i.toString().c_str());
// Make sure we are one record past the last user key
if(self->m_kv.present()) {
while(i->valid() && i->kvv.key <= self->m_kv.get().key) {
Void _ = wait(i->move(true));
while(i.valid() && i.kvv.key <= self->m_kv.get().key) {
Void _ = wait(i.move(true));
}
}
state Version v = self->m_pagerSnapshot->getVersion();
state Reference<InternalCursor> iLast;
state InternalCursor iLast;
while(1) {
iLast = i->clone();
if(!i->valid())
iLast = i;
if(!i.valid())
break;
Void _ = wait(i->move(true));
if(iLast->kvv.version <= v
&& iLast->kvv.value.present()
Void _ = wait(i.move(true));
if(iLast.kvv.version <= v
&& iLast.kvv.value.present()
&& (
!i->valid()
|| i->kvv.key != iLast->kvv.key
|| i->kvv.version > v
!i.valid()
|| i.kvv.key != iLast.kvv.key
|| i.kvv.version > v
)
) {
// Assume that next is the most likely next move, so save the one-too-far cursor position.
@ -1218,7 +1202,7 @@ private:
// Read all of the current value, if it is split across multiple kv pairs, and set m_kv.
// m_current must be at either the first or the last value part.
ACTOR static Future<Void> readFullKVPair(Reference<Cursor> self) {
state KeyVersionValue &kvv = self->m_icursor->kvv;
state KeyVersionValue &kvv = self->m_icursor.kvv;
// Initialize the optional kv to a KeyValueRef and get a reference to it.
state KeyValueRef &kv = (self->m_kv = KeyValueRef()).get();
kv.key = KeyRef(self->m_arena, kvv.key);
@ -1241,7 +1225,7 @@ private:
ASSERT(kvv.value.present());
parts.push_back(kvv.value.get());
size += kvv.value.get().size();
Void _ = wait(self->m_icursor->move(true));
Void _ = wait(self->m_icursor.move(true));
if(!kvv.valid() || kvv.version != valueVersion || kvv.key != kv.key)
break;
}
@ -1265,7 +1249,7 @@ private:
memcpy(mutateString(kv.value) + kvv.valueIndex, kvv.value.get().begin(), kvv.value.get().size());
if(kvv.valueIndex == 0)
break;
Void _ = wait(self->m_icursor->move(false));
Void _ = wait(self->m_icursor.move(false));
}
return Void();