Checkpointing progress on large rewrite of how mutations are stored and applied. Not working yet.
This commit is contained in:
parent
0b817f95f2
commit
888093463b
|
@ -115,18 +115,69 @@ struct SimpleFixedSizeMapRef {
|
|||
return pages;
|
||||
}
|
||||
|
||||
std::string toString();
|
||||
std::string toString() const;
|
||||
|
||||
KVPairsT entries;
|
||||
uint8_t flags;
|
||||
};
|
||||
|
||||
struct KeyVersionValue {
|
||||
KeyVersionValue() : version(invalidVersion) {}
|
||||
KeyVersionValue(Key k, Version ver, Optional<Value> val = Optional<Value>()) : key(k), version(ver), value(val) {}
|
||||
bool operator< (KeyVersionValue const &rhs) const {
|
||||
int64_t cmp = key.compare(rhs.key);
|
||||
if(cmp == 0) {
|
||||
cmp = version - rhs.version;
|
||||
if(cmp == 0)
|
||||
return false;
|
||||
}
|
||||
return cmp < 0;
|
||||
}
|
||||
Key key;
|
||||
Version version;
|
||||
Optional<Value> value;
|
||||
|
||||
bool valid() { return version != invalidVersion; }
|
||||
|
||||
inline KeyValue pack() const {
|
||||
Tuple k;
|
||||
k.append(key);
|
||||
k.append(version);
|
||||
Tuple v;
|
||||
if(value.present())
|
||||
v.append(value.get());
|
||||
else
|
||||
v.appendNull();
|
||||
return KeyValueRef(k.pack(), v.pack());
|
||||
}
|
||||
static inline KeyVersionValue unpack(KeyValueRef kv) {
|
||||
Tuple k = Tuple::unpack(kv.key);
|
||||
if(kv.value.size() == 0)
|
||||
return KeyVersionValue(k.getString(0), k.getInt(1));
|
||||
Tuple v = Tuple::unpack(kv.value);
|
||||
return KeyVersionValue(k.getString(0), k.getInt(1), v.getType(0) == Tuple::NULL_TYPE ? Optional<Value>() : v.getString(0));
|
||||
}
|
||||
|
||||
static inline KeyVersionValue unpack(KeyRef key) {
|
||||
debug_printf("Unpacking '%s'\n", printable(key).c_str());
|
||||
if(key.size() == 0)
|
||||
return KeyVersionValue(KeyRef(), 0);
|
||||
return unpack(KeyValueRef(key, ValueRef()));
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
return format("'%s' -> '%s' @%lld", key.toString().c_str(), value.present() ? value.get().toString().c_str() : "<cleared>", version);
|
||||
}
|
||||
};
|
||||
|
||||
#define NOT_IMPLEMENTED { UNSTOPPABLE_ASSERT(false); }
|
||||
|
||||
class VersionedBTree : public IVersionedStore {
|
||||
public:
|
||||
enum EPageFlags { IS_LEAF = 1};
|
||||
static Key endKey;
|
||||
|
||||
// The end key for the entire tree
|
||||
static KeyVersionValue endKey;
|
||||
|
||||
typedef SimpleFixedSizeMapRef FixedSizeMap;
|
||||
|
||||
|
@ -145,12 +196,32 @@ public:
|
|||
virtual void set(KeyValueRef keyValue) {
|
||||
ASSERT(m_writeVersion != invalidVersion);
|
||||
|
||||
applyMutation(new Mutation(m_writeVersion, keyValue.key, keyValue.value, MutationRef::SetValue));
|
||||
SingleKeyMutationsByVersion &changes = insertMutationBoundary(keyValue.key)->second.startKeyMutations;
|
||||
|
||||
// Add the set if the changes set is empty or the last entry isn't a set to exactly the same value
|
||||
if(changes.empty() || !changes.rbegin()->second.equalToSet(keyValue.value))
|
||||
changes[m_writeVersion] = SingleKeyMutation(keyValue.value);
|
||||
}
|
||||
virtual void clear(KeyRangeRef range) {
|
||||
ASSERT(m_writeVersion != invalidVersion);
|
||||
|
||||
applyMutation(new Mutation(m_writeVersion, range.begin, range.end, MutationRef::ClearRange));
|
||||
MutationBufferT::iterator iBegin = insertMutationBoundary(range.begin);
|
||||
MutationBufferT::iterator iEnd = insertMutationBoundary(range.end);
|
||||
|
||||
// For each boundary in the cleared range
|
||||
while(iBegin != iEnd) {
|
||||
RangeMutation &range = iBegin->second;
|
||||
|
||||
// Set the rangeClearedVersion if not set
|
||||
if(!range.rangeClearVersion.present())
|
||||
range.rangeClearVersion = m_writeVersion;
|
||||
|
||||
// Add a clear to the startKeyMutations map if it's empty or the last item is not a clear
|
||||
if(range.startKeyMutations.empty() || !range.startKeyMutations.rbegin()->second.isClear())
|
||||
range.startKeyMutations[m_writeVersion] = SingleKeyMutation();
|
||||
|
||||
++iBegin;
|
||||
}
|
||||
}
|
||||
|
||||
virtual void mutate(int op, StringRef param1, StringRef param2) NOT_IMPLEMENTED
|
||||
|
@ -182,14 +253,13 @@ public:
|
|||
Void _ = wait(self->m_pager->commit());
|
||||
}
|
||||
self->m_lastCommittedVersion = latest;
|
||||
self->initMutationBuffer();
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> init() { return init(this); }
|
||||
|
||||
virtual ~VersionedBTree() {
|
||||
for(Mutation *m : m_mutations)
|
||||
delete m;
|
||||
}
|
||||
|
||||
// readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never previously passed
|
||||
|
@ -229,215 +299,159 @@ private:
|
|||
typedef std::pair<Version, std::vector<KeyPagePairT>> VersionedKeyToPageSetT;
|
||||
typedef std::vector<VersionedKeyToPageSetT> VersionedChildrenT;
|
||||
|
||||
struct Mutation {
|
||||
Mutation(Version ver, Key key, Value valueOrEnd, MutationRef::Type op) : version(ver), key(key), valueOrEnd(valueOrEnd), op(op) {}
|
||||
Version version;
|
||||
Key key;
|
||||
Value valueOrEnd;
|
||||
// Represents a change to a single key - set, clear, or atomic op
|
||||
struct SingleKeyMutation {
|
||||
// Clear
|
||||
SingleKeyMutation() : op(MutationRef::ClearRange) {}
|
||||
// Set
|
||||
SingleKeyMutation(Value val) : op(MutationRef::SetValue), value(val) {}
|
||||
// Atomic Op
|
||||
SingleKeyMutation(MutationRef::Type op, Value val) : op(op), value(val) {}
|
||||
|
||||
MutationRef::Type op;
|
||||
Value value;
|
||||
|
||||
inline bool isClear() const { return op == MutationRef::ClearRange; }
|
||||
inline bool isSet() const { return op == MutationRef::SetValue; }
|
||||
inline bool isAtomicOp() const { return !isClear() && !isSet(); }
|
||||
inline bool isAtomicOp() const { return !isSet() && !isClear(); }
|
||||
|
||||
struct byVersion { bool operator() (const Mutation *a, const Mutation *b) { return a->version < b->version; } };
|
||||
struct byKey { bool operator() (const Mutation *a, const Mutation *b) { return a->key < b->key; } };
|
||||
inline bool equalToSet(ValueRef val) { return isSet() && value == val; }
|
||||
|
||||
std::string toString() {
|
||||
return format("@%lld op=%d key=%s value_or_end=%s", version, op, printable(key).c_str(), printable(valueOrEnd).c_str());
|
||||
}
|
||||
};
|
||||
KeyVersionValue toKVV(Key key, Version version) const {
|
||||
ASSERT(!isAtomicOp());
|
||||
|
||||
typedef std::set<Mutation *, Mutation::byVersion> MutationsByVersionT;
|
||||
typedef std::set<Mutation *, Mutation::byKey> MutationsByKeyT;
|
||||
if(isClear())
|
||||
return KeyVersionValue(key, version);
|
||||
|
||||
typedef std::map<Key, MutationsByVersionT> MutationBufferT;
|
||||
typedef std::map<Version, MutationsByKeyT> MutationBufferPerVersionT;
|
||||
|
||||
/* MutationBufferT should be simplified to the following. It is more efficient to mutate,
|
||||
* has a smaller memory footprint, and may even be easier to use during commit. It does
|
||||
* not require sets to be modeled as a mutation of a range from key to keyAfter(key). It
|
||||
* loses the ability to model "range sets" of unspecified existing keys, but that doesn't
|
||||
* seem very useful anyway. This model also no longer uses the Mutation structure,
|
||||
* which could possibly be removed entirely unless there is some benefit to having it.
|
||||
*
|
||||
* typedef std::map<Key, std::pair<Optional<Version>, std::map<Version, Optional<Value>>>> MutationBufferT;
|
||||
*
|
||||
* The pair stored for a key in the buffer map represents
|
||||
* rangeClearVersion: the version at which a range starting with this key was cleared
|
||||
* individualOps: the individual ops (sets/clear/atomic) done on this key
|
||||
*
|
||||
* - Keys are inserted into the buffer map for every individual operation (set/clear/atomic)
|
||||
* key and for both the start and end of a range clear.
|
||||
* - When a new key is inserted in the buffer map it should take on the immediately previous
|
||||
* entry's range clear version.
|
||||
* - To apply a single clear, add it to the individual ops only if the last entry is not also a clear.
|
||||
* - To apply a range clear, set any unset range clear values >= start and < end.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* Version Op
|
||||
* 1 set b
|
||||
* 2 set b
|
||||
* 3 clear bb c
|
||||
* 3 clear b
|
||||
* 4 clear a d
|
||||
* 4 set cc
|
||||
* 5 clear cc g
|
||||
*
|
||||
* Buffer state after these operations:
|
||||
* Key RangeClearVersion IndividualOps
|
||||
* a 4
|
||||
* b 4 1,set 2,set 3,clear
|
||||
* bb 3
|
||||
* c 4
|
||||
* cc 4 4,set 5,clear
|
||||
* d 5
|
||||
* gg
|
||||
*/
|
||||
|
||||
// Find or create a mutation buffer boundary for bound and return an iterator to it
|
||||
MutationBufferT::iterator insertMutationBoundary(Key bound) {
|
||||
// Find the first split point in buffer that is >= key
|
||||
MutationBufferT::iterator start = m_buffer.lower_bound(bound);
|
||||
|
||||
// If an exact match was found then we're done
|
||||
if(start != m_buffer.end() && start->first == bound)
|
||||
return start;
|
||||
|
||||
// If begin was found then bound will be the first lexically ordered boundary so we can just insert it.
|
||||
if(start == m_buffer.begin())
|
||||
return m_buffer.insert(start, {bound, {}});
|
||||
|
||||
// At this point, we know that
|
||||
// - buffer is not empty
|
||||
// - bound does not exist in buffer
|
||||
// - start points to the first thing passed bound (which could even be end())
|
||||
// Previous will refer to the start of the range we are splitting
|
||||
MutationBufferT::iterator previous = start;
|
||||
--previous;
|
||||
|
||||
// Insert the new boundary
|
||||
start = m_buffer.insert(start, {bound, {}});
|
||||
|
||||
// Copy any range mutations from previous boundary to the new one
|
||||
for(Mutation *m : previous->second)
|
||||
if(m->isClear())
|
||||
start->second.insert(m);
|
||||
|
||||
return start;
|
||||
}
|
||||
|
||||
void applyMutation(Mutation *m) {
|
||||
// Mutation pointers can exist in many places in m_buffer so rather than reference count them
|
||||
// since they will all be deleted together we'll just stash them here.
|
||||
m_mutations.push_back(m);
|
||||
|
||||
// TODO: Update mapForVersion if we're going to use it in commitSubtree, or remove it
|
||||
//auto &mapForVersion = m_buffer_byVersion[m_writeVersion];
|
||||
|
||||
// TODO: Combine both cases of these if's as they are now nearly identical except for mutation repeat check and boundary selection
|
||||
if(m->isClear()) {
|
||||
auto start = insertMutationBoundary(m->key);
|
||||
auto end = insertMutationBoundary(m->valueOrEnd);
|
||||
while(start != end) {
|
||||
auto &mutationSet = start->second;
|
||||
auto im = mutationSet.end();
|
||||
bool skip = false;
|
||||
// If mutationSet has stuff in it, see if the new mutation replaces or
|
||||
// combines with the last entry in mutationSet.
|
||||
if(im != mutationSet.begin()) {
|
||||
--im;
|
||||
// If the previous last mutation is a clear, then the new clear does nothing so we can skip it.
|
||||
if((*im)->isClear()) {
|
||||
skip = true;
|
||||
}
|
||||
else {
|
||||
// If the version is the same, erase it so we can replace it with the new mutation
|
||||
if((*im)->version == m->version)
|
||||
mutationSet.erase(im);
|
||||
}
|
||||
}
|
||||
if(!skip)
|
||||
mutationSet.insert(mutationSet.end(), m);
|
||||
++start;
|
||||
}
|
||||
}
|
||||
else {
|
||||
insertMutationBoundary(keyAfter(m->key));
|
||||
auto &mutationSet = insertMutationBoundary(m->key)->second;
|
||||
auto im = mutationSet.end();
|
||||
bool skip = false;
|
||||
if(im != mutationSet.begin()) {
|
||||
--im;
|
||||
// If the previous last mutation is a set to the same value as the new one then we can skip the new mutation as it does nothing
|
||||
if((*im)->isSet() && m->isSet() && (*im)->valueOrEnd == m->valueOrEnd) {
|
||||
skip = true;
|
||||
}
|
||||
else {
|
||||
// If the version is the same, erase it so we can replace it with the new mutation
|
||||
if((*im)->version == m->version)
|
||||
mutationSet.erase(im);
|
||||
}
|
||||
}
|
||||
if(!skip)
|
||||
mutationSet.insert(mutationSet.end(), m);
|
||||
}
|
||||
|
||||
/*
|
||||
debug_printf("-------------------------------------\n");
|
||||
debug_printf("BUFFER\n");
|
||||
for(auto &i : m_buffer) {
|
||||
debug_printf("'%s'\n", printable(i.first).c_str());
|
||||
for(auto j : i.second) {
|
||||
debug_printf("\t%s\n", j->toString().c_str());
|
||||
}
|
||||
}
|
||||
debug_printf("-------------------------------------\n");
|
||||
*/
|
||||
|
||||
}
|
||||
|
||||
struct KeyVersionValue {
|
||||
KeyVersionValue() : version(invalidVersion) {}
|
||||
KeyVersionValue(Key k, Version ver, Optional<Value> val = Optional<Value>()) : key(k), version(ver), value(val) {}
|
||||
bool operator< (KeyVersionValue const &rhs) const {
|
||||
int64_t cmp = key.compare(rhs.key);
|
||||
if(cmp == 0) {
|
||||
cmp = version - rhs.version;
|
||||
if(cmp == 0)
|
||||
return false;
|
||||
}
|
||||
return cmp < 0;
|
||||
}
|
||||
Key key;
|
||||
Version version;
|
||||
Optional<Value> value;
|
||||
|
||||
inline KeyValue pack() const {
|
||||
Tuple k;
|
||||
k.append(key);
|
||||
k.append(version);
|
||||
Tuple v;
|
||||
if(value.present())
|
||||
v.append(value.get());
|
||||
else
|
||||
v.appendNull();
|
||||
return KeyValueRef(k.pack(), v.pack());
|
||||
}
|
||||
static inline KeyVersionValue unpack(KeyValueRef kv) {
|
||||
Tuple k = Tuple::unpack(kv.key);
|
||||
if(kv.value.size() == 0)
|
||||
return KeyVersionValue(k.getString(0), k.getInt(1));
|
||||
Tuple v = Tuple::unpack(kv.value);
|
||||
return KeyVersionValue(k.getString(0), k.getInt(1), v.getType(0) == Tuple::NULL_TYPE ? Optional<Value>() : v.getString(0));
|
||||
return KeyVersionValue(key, version, value);
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
return format("'%s' -> '%s' @%lld", key.toString().c_str(), value.present() ? value.get().toString().c_str() : "<cleared>", version);
|
||||
return format("op=%d val=%s", op, printable(value).c_str());
|
||||
}
|
||||
};
|
||||
|
||||
// Represents mutations on a single key and a possible clear to a range that begins
|
||||
// immediately after that key
|
||||
typedef std::map<Version, SingleKeyMutation> SingleKeyMutationsByVersion;
|
||||
struct RangeMutation {
|
||||
// Mutations for exactly the start key
|
||||
SingleKeyMutationsByVersion startKeyMutations;
|
||||
// A clear range version, if cleared, for the range starting immediately AFTER the start key
|
||||
Optional<Version> rangeClearVersion;
|
||||
|
||||
// Returns true if this RangeMutation doesn't actually mutate anything
|
||||
bool noChanges() const {
|
||||
return !rangeClearVersion.present() && startKeyMutations.empty();
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
std::string result;
|
||||
result.append("rangeClearVersion: ");
|
||||
if(rangeClearVersion.present())
|
||||
result.append(format("%lld", rangeClearVersion.get()));
|
||||
else
|
||||
result.append("<not present>");
|
||||
result.append(" startKeyMutations: ");
|
||||
for(SingleKeyMutationsByVersion::value_type const &m : startKeyMutations)
|
||||
result.append(format("[%lld => %s] ", m.first, m.second.toString().c_str()));
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
typedef std::map<Key, RangeMutation> MutationBufferT;
|
||||
|
||||
/* Mutation Buffer Overview
|
||||
*
|
||||
* MutationBuffer maps the start of a range to a RangeMutation. The end of the range is
|
||||
* the next range start in the map.
|
||||
*
|
||||
* - The buffer starts out with keys '' and endKey.key already populated.
|
||||
*
|
||||
* - When a new key is inserted into the buffer map, it is by definition
|
||||
* splitting an existing range so it should take on the rangeClearVersion of
|
||||
* the immediately preceding key which is the start of that range
|
||||
*
|
||||
* - Keys are inserted into the buffer map for every individual operation (set/clear/atomic)
|
||||
* key and for both the start and end of a range clear.
|
||||
*
|
||||
* - To apply a single clear, add it to the individual ops only if the last entry is not also a clear.
|
||||
*
|
||||
* - To apply a range clear, after inserting the new range boundaries do the following to the start
|
||||
* boundary and all successive boundaries < end
|
||||
* - set the range clear version if not already set
|
||||
* - add a clear to the startKeyMutations if the final entry is not a clear.
|
||||
*
|
||||
* - Note that there are actually TWO valid ways to represent
|
||||
* set c = val1 at version 1
|
||||
* clear c\x00 to z at version 2
|
||||
* with this model. Either
|
||||
* c = { rangeClearVersion = 2, startKeyMutations = { 1 => val1 }
|
||||
* z = { rangeClearVersion = <not present>, startKeyMutations = {}
|
||||
* OR
|
||||
* c = { rangeClearVersion = <not present>, startKeyMutations = { 1 => val1 }
|
||||
* c\x00 = { rangeClearVersion = 2, startKeyMutations = { 2 => <not present> }
|
||||
* z = { rangeClearVersion = <not present>, startKeyMutations = {}
|
||||
*
|
||||
* This is because the rangeClearVersion applies to a range begining with the first
|
||||
* key AFTER the start key, so that the logic for reading the start key is more simple
|
||||
* as it only involves consulting startKeyMutations. When adding a clear range, the
|
||||
* boundary key insert/split described above is valid, and is what is currently done,
|
||||
* but it would also be valid to see if the last key before startKey is equal to
|
||||
* keyBefore(startKey), and if so that mutation buffer boundary key can be used instead
|
||||
* without adding an additional key to the buffer.
|
||||
*/
|
||||
|
||||
void printMutationBuffer(MutationBufferT::const_iterator begin, MutationBufferT::const_iterator end) {
|
||||
#if REDWOOD_DEBUG
|
||||
debug_printf("-------------------------------------\n");
|
||||
debug_printf("BUFFER\n");
|
||||
while(begin != end) {
|
||||
debug_printf("'%s': %s\n", printable(begin->first).c_str(), begin->second.toString().c_str());
|
||||
++begin;
|
||||
}
|
||||
debug_printf("-------------------------------------\n");
|
||||
#endif
|
||||
}
|
||||
|
||||
void printMutationBuffer() {
|
||||
return printMutationBuffer(m_buffer.begin(), m_buffer.end());
|
||||
}
|
||||
|
||||
void initMutationBuffer() {
|
||||
// Create range representing the entire keyspace. This reduces edge cases to applying mutations
|
||||
// because now all existing keys are within some range in the mutation map.
|
||||
m_buffer.clear();
|
||||
m_buffer[StringRef()];
|
||||
m_buffer[endKey.key];
|
||||
}
|
||||
|
||||
// Find or create a mutation buffer boundary for bound and return an iterator to it
|
||||
MutationBufferT::iterator insertMutationBoundary(Key boundary) {
|
||||
// Find the first split point in buffer that is >= key
|
||||
MutationBufferT::iterator ib = m_buffer.lower_bound(boundary);
|
||||
|
||||
// Since the initial state of the mutation buffer contains the range '' through
|
||||
// the maximum possible key, our search had to have found something.
|
||||
ASSERT(ib != m_buffer.end());
|
||||
|
||||
// If we found the boundary we are looking for, return its iterator
|
||||
if(ib->first == boundary)
|
||||
return ib;
|
||||
|
||||
// ib is our insert hint. Insert the new boundary and set ib to its entry
|
||||
ib = m_buffer.insert(ib, {boundary, RangeMutation()});
|
||||
|
||||
// ib is certainly > begin() because it is guaranteed that the empty string
|
||||
// boundary exists and the only way to have found that is to look explicitly
|
||||
// for it in which case we would have returned above.
|
||||
MutationBufferT::iterator iPrevious = ib;
|
||||
--ib;
|
||||
ib->second.rangeClearVersion = iPrevious->second.rangeClearVersion;
|
||||
|
||||
return ib;
|
||||
}
|
||||
|
||||
void buildNewRoot(Version version, vector<std::pair<int, Reference<IPage>>> &pages, std::vector<LogicalPageID> &logicalPageIDs, FixedSizeMap::KVPairsT &childEntries) {
|
||||
// While there are multiple child pages for this version we must write new tree levels.
|
||||
while(pages.size() > 1) {
|
||||
|
@ -469,17 +483,54 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// Returns list of (version, list of (lower_bound, list of children) )
|
||||
ACTOR static Future<VersionedChildrenT> commitSubtree(VersionedBTree *self, Reference<IPagerSnapshot> snapshot, LogicalPageID root, Key lowerBoundKey, Key upperBoundKey, MutationBufferT::const_iterator iMutationMap, MutationBufferT::const_iterator iMutationMapEnd) {
|
||||
ACTOR static Future<VersionedChildrenT> commitSubtree(VersionedBTree *self, Reference<IPagerSnapshot> snapshot, LogicalPageID root, Key lowerBoundKey, Key upperBoundKey) {
|
||||
state std::string printPrefix = format("commit subtree(lowerboundkey %s, page %u) ", lowerBoundKey.toString().c_str(), root);
|
||||
debug_printf("%s\n", printPrefix.c_str());
|
||||
|
||||
if(iMutationMap == iMutationMapEnd) {
|
||||
debug_printf("%s no changes\n", printPrefix.c_str());
|
||||
// Decode the upper and lower bound keys for this subtree
|
||||
// Note that these could be truncated to be the shortest usable boundaries between two pages but that
|
||||
// still works for what we're using them for here, so instead of using the KVV unpacker we'll have to
|
||||
// something else here that supports incomplete keys.
|
||||
state KeyVersionValue lowerBoundKVV = KeyVersionValue::unpack(lowerBoundKey);
|
||||
state KeyVersionValue upperBoundKVV = KeyVersionValue::unpack(upperBoundKey);
|
||||
|
||||
// Find the slice of the mutation buffer that is relevant to this subtree
|
||||
state MutationBufferT::const_iterator iMutationBoundary = self->m_buffer.lower_bound(lowerBoundKVV.key);
|
||||
state MutationBufferT::const_iterator iMutationBoundaryEnd = self->m_buffer.lower_bound(upperBoundKVV.key);
|
||||
|
||||
// If the lower bound key and the upper bound key are the same then there can't be any changes to
|
||||
// this subtree since changes would happen after the upper bound key as the mutated versions would
|
||||
// necessarily be higher.
|
||||
if(lowerBoundKVV.key == upperBoundKVV.key) {
|
||||
debug_printf("%s no changes, lower and upper bound keys are the same.\n", printPrefix.c_str());
|
||||
return VersionedChildrenT({ {0,{{lowerBoundKey,root}}} });
|
||||
}
|
||||
|
||||
// If the mutation buffer key found is greater than the lower bound key then go to the previous mutation
|
||||
// buffer key because it may cover deletion of some keys at the start of this subtree.
|
||||
if(iMutationBoundary->first > lowerBoundKVV.key)
|
||||
--iMutationBoundary;
|
||||
else {
|
||||
// If the there are no mutations, we're done
|
||||
if(iMutationBoundary == iMutationBoundaryEnd) {
|
||||
debug_printf("%s no changes, mutation buffer start/end are the same\n", printPrefix.c_str());
|
||||
return VersionedChildrenT({ {0,{{lowerBoundKey,root}}} });
|
||||
}
|
||||
}
|
||||
|
||||
// Another way to have no mutations is to have a single mutation range cover this
|
||||
// subtree but have no changes in it
|
||||
MutationBufferT::const_iterator iMutationBoundaryNext = iMutationBoundary;
|
||||
++iMutationBoundaryNext;
|
||||
if(iMutationBoundaryNext->second.noChanges()) {
|
||||
debug_printf("%s no changes because sole mutation range was empty\n", printPrefix.c_str());
|
||||
return VersionedChildrenT({ {0,{{lowerBoundKey,root}}} });
|
||||
}
|
||||
|
||||
debug_printf("%s MUTATION BUFFER:\n", printPrefix.c_str());
|
||||
self->printMutationBuffer(iMutationBoundary, iMutationBoundaryEnd);
|
||||
|
||||
state FixedSizeMap map;
|
||||
Reference<const IPage> rawPage = wait(snapshot->getPhysicalPage(root));
|
||||
map = FixedSizeMap::decode(StringRef(rawPage->begin(), rawPage->size()));
|
||||
|
@ -493,7 +544,7 @@ private:
|
|||
SimpleFixedSizeMapRef::KVPairsT::const_iterator iExistingEnd = map.entries.end();
|
||||
|
||||
// It's a given that the mutation map is not empty so it's safe to do this
|
||||
Key mutationRangeStart = iMutationMap->first;
|
||||
Key mutationRangeStart = iMutationBoundary->first;
|
||||
|
||||
// There will be multiple loops advancing iExisting, existing will track its current value
|
||||
KeyVersionValue existing;
|
||||
|
@ -503,104 +554,79 @@ private:
|
|||
// If replacement pages are written they will be at the minimum version seen in the mutations for this leaf
|
||||
Version minVersion = std::numeric_limits<Version>::max();
|
||||
|
||||
// First read and output any existing values that are less than or equal to the first mutation key
|
||||
while(iExisting != iExistingEnd && existing.key <= mutationRangeStart) {
|
||||
merged.push_back(existing.pack());
|
||||
debug_printf("Added existing pre range %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
++iExisting;
|
||||
if(iExisting != iExistingEnd) {
|
||||
lastExisting = existing;
|
||||
existing = KeyVersionValue::unpack(*iExisting);
|
||||
}
|
||||
}
|
||||
// Now, process each mutation range and merge changes with existing data.
|
||||
while(iMutationBoundary != iMutationBoundaryEnd) {
|
||||
debug_printf("%s Processing Mutation Range: '%s' to '%s'\n", printPrefix.c_str(), printable(iMutationBoundary->first).c_str(), printable(iMutationBoundaryEnd->first).c_str());
|
||||
debug_printf("%s Mutations: %s\n", printPrefix.c_str(), iMutationBoundary->second.toString().c_str());
|
||||
|
||||
while(iMutationMap != iMutationMapEnd) {
|
||||
printf("mutationRangeStart %s\n", printable(mutationRangeStart).c_str());
|
||||
MutationsByVersionT::const_iterator iMutations = iMutationMap->second.begin();
|
||||
MutationsByVersionT::const_iterator iMutationsEnd = iMutationMap->second.end();
|
||||
SingleKeyMutationsByVersion::const_iterator iMutations;
|
||||
|
||||
// The end of the mutation range is the next key in the mutation map
|
||||
++iMutationMap;
|
||||
Key mutationRangeEnd = (iMutationMap == iMutationMapEnd) ? endKey : iMutationMap->first;
|
||||
printf("mutationRangeEnd %s\n", printable(mutationRangeEnd).c_str());
|
||||
// If the mutation boundary key is the same as the page lowerBound key then start reading single
|
||||
// key mutations at the first version greater than the lowerBoundKey version.
|
||||
if(iMutationBoundary->first == lowerBoundKVV.key)
|
||||
iMutations = iMutationBoundary->second.startKeyMutations.upper_bound(lowerBoundKVV.version);
|
||||
else
|
||||
iMutations = iMutationBoundary->second.startKeyMutations.begin();
|
||||
|
||||
// Tracks whether there was a clearRange covering mutationRange to mutationEnd (or beyond) in the mutation set
|
||||
Version rangeClearedVersion = invalidVersion;
|
||||
SingleKeyMutationsByVersion::const_iterator iMutationsEnd = iMutationBoundary->second.startKeyMutations.end();
|
||||
|
||||
// Now insert the mutations which affect the individual key of mutationRangeStart
|
||||
bool firstMutation = true;
|
||||
while(iMutations != iMutationsEnd) {
|
||||
Mutation *m = *iMutations;
|
||||
debug_printf("mutation: %s first: %d\n", m->toString().c_str(), firstMutation);
|
||||
// Potentially update earliest version of mutations being applied.
|
||||
if(m->version < minVersion)
|
||||
minVersion = m->version;
|
||||
|
||||
if(m->isClear()) {
|
||||
// Any clear range mutation applies at least from mutationRangeStart to mutationRangeEnd, by definition,
|
||||
// because clears cause insertions of split points into the mutation buffer
|
||||
if(rangeClearedVersion == invalidVersion)
|
||||
rangeClearedVersion = m->version;
|
||||
|
||||
// Here we're only handling clears for the key mutationRangeStart, if the rest of the range
|
||||
// is cleared it will be handled below
|
||||
// Only write a clear if this is either not the first mutation OR
|
||||
// lastExisting was a clear for the same key
|
||||
if(!firstMutation || (lastExisting.value.present() && lastExisting.key == mutationRangeStart)) {
|
||||
merged.push_back(KeyVersionValue(mutationRangeStart, m->version).pack());
|
||||
debug_printf("Added clear of existing or from mutation buffer %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
}
|
||||
}
|
||||
else if(m->isSet()) {
|
||||
// Write the new value if this is not the first mutation or they is different from
|
||||
// lastExisting or the lastExisting value was either empty or not the same as m.
|
||||
if( !firstMutation
|
||||
|| lastExisting.key != m->key
|
||||
|| !lastExisting.value.present()
|
||||
|| lastExisting.value.get() != m->valueOrEnd
|
||||
)
|
||||
merged.push_back(KeyVersionValue(m->key, m->version, m->valueOrEnd).pack());
|
||||
debug_printf("Added set mutation %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
}
|
||||
else { // isAtomicOp
|
||||
ASSERT(m->isAtomicOp());
|
||||
// TODO: apply atomic op to last existing, update lastExisting
|
||||
ASSERT(false);
|
||||
}
|
||||
++iMutations;
|
||||
if(firstMutation)
|
||||
firstMutation = false;
|
||||
}
|
||||
|
||||
// Next, while the existing key is in this mutation range output it and also emit
|
||||
// a clear at each key if required.
|
||||
while(iExisting != iExistingEnd && existing.key < mutationRangeEnd) {
|
||||
// Output old versions of the mutation boundary key
|
||||
while(iExisting != iExistingEnd && existing.key == iMutationBoundary->first) {
|
||||
merged.push_back(existing.pack());
|
||||
debug_printf("Added existing in range %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
debug_printf("Added existing version of mutation boundary start key: %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
|
||||
++iExisting;
|
||||
if(iExisting != iExistingEnd) {
|
||||
lastExisting = existing;
|
||||
if(iExisting != iExistingEnd)
|
||||
existing = KeyVersionValue::unpack(*iExisting);
|
||||
// If the next key is different than the last then add a clear of the last key
|
||||
// at the range clear version if the range was cleared
|
||||
if(existing.key != lastExisting.key && rangeClearedVersion != invalidVersion) {
|
||||
merged.push_back(KeyVersionValue(lastExisting.key, rangeClearedVersion).pack());
|
||||
debug_printf("Added clear of existing, next key different %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
}
|
||||
}
|
||||
else if(rangeClearedVersion != invalidVersion) {
|
||||
// If there are no more existing keys but the range was cleared then we might have to add a clear of the last key
|
||||
// but only if the key encoded in upperBoundKey is not the same, which case the clear belongs in that page.
|
||||
if(existing.key != KeyVersionValue::unpack(KeyValueRef(upperBoundKey, StringRef())).key) {
|
||||
merged.push_back(KeyVersionValue(existing.key, rangeClearedVersion).pack());
|
||||
debug_printf("Added clear of last existing %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mutationRangeStart = mutationRangeEnd;
|
||||
// Output mutations for the mutation boundary start key
|
||||
while(iMutations != iMutationsEnd) {
|
||||
merged.push_back(iMutations->second.toKVV(iMutationBoundary->first, iMutations->first).pack());
|
||||
debug_printf("Added mutation of boundary start key: %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
++iMutations;
|
||||
}
|
||||
|
||||
// Get the clear version for this range, which is the last thing that we need from it,
|
||||
Optional<Version> clearRangeVersion = iMutationBoundary->second.rangeClearVersion;
|
||||
// Advance to the next boundary because we need to know the end key for the current range.
|
||||
++iMutationBoundary;
|
||||
|
||||
// Write existing keys which are less than the next mutation boundary key, clearing if needed.
|
||||
while(iExisting != iExistingEnd && existing.key < iMutationBoundary->first) {
|
||||
merged.push_back(existing.pack());
|
||||
debug_printf("Added existing key in mutation range: %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
|
||||
// Write a clear of this key if needed. A clear is required if clearRangeVersion is set and the next key is different
|
||||
// than this one. Note that the next key might be the in our right sibling, we can use the page upperBound to get that.
|
||||
++iExisting;
|
||||
KeyVersionValue nextEntry;
|
||||
if(iExisting != iExistingEnd)
|
||||
nextEntry = KeyVersionValue::unpack(*iExisting);
|
||||
else
|
||||
nextEntry = upperBoundKVV;
|
||||
|
||||
if(clearRangeVersion.present() && existing.key != nextEntry.key) {
|
||||
merged.push_back(KeyVersionValue(existing.key, clearRangeVersion.get()).pack());
|
||||
debug_printf("Added clear of existing key in mutation range: %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
}
|
||||
|
||||
if(iExisting != iExistingEnd)
|
||||
existing = nextEntry;
|
||||
}
|
||||
|
||||
// Write any remaining existing keys, which are not subject to clears as they are beyond the cleared range.
|
||||
while(iExisting != iExistingEnd) {
|
||||
merged.push_back(existing.pack());
|
||||
debug_printf("Added existing tail key: %s\n", KeyVersionValue::unpack(merged.back()).toString().c_str());
|
||||
|
||||
++iExisting;
|
||||
if(iExisting != iExistingEnd)
|
||||
existing = KeyVersionValue::unpack(*iExisting);
|
||||
}
|
||||
}
|
||||
debug_printf("DONE MERGING MUTATIONS WITH EXISTING LEAF CONTENTS\n");
|
||||
|
||||
debug_printf("%s DONE MERGING MUTATIONS WITH EXISTING LEAF CONTENTS\n", printPrefix.c_str());
|
||||
|
||||
// TODO: Make version and key splits based on contents of merged list
|
||||
|
||||
|
@ -644,26 +670,13 @@ private:
|
|||
else {
|
||||
state std::vector<Future<VersionedChildrenT>> m_futureChildren;
|
||||
|
||||
auto childMutBegin = iMutationMap;
|
||||
|
||||
for(int i=0; i<map.entries.size(); i++) {
|
||||
MutationBufferT::const_iterator childMutEnd;
|
||||
Key upperBoundKey;
|
||||
if (i+1 != map.entries.size()) {
|
||||
upperBoundKey = map.entries[i+1].key;
|
||||
childMutEnd = self->m_buffer.lower_bound( KeyVersionValue::unpack(KeyValueRef(upperBoundKey, ValueRef())).key );
|
||||
}
|
||||
else {
|
||||
upperBoundKey = endKey;
|
||||
childMutEnd = iMutationMapEnd;
|
||||
}
|
||||
|
||||
// The next child page might have existing kv pairs that are covered by the previous mutation range
|
||||
if(childMutBegin != iMutationMap)
|
||||
--childMutBegin;
|
||||
|
||||
m_futureChildren.push_back(self->commitSubtree(self, snapshot, *(uint32_t*)map.entries[i].value.begin(), map.entries[i].key, upperBoundKey, childMutBegin, childMutEnd));
|
||||
childMutBegin = childMutEnd;
|
||||
Key currentUpperBound;
|
||||
if(i + 1 < map.entries.size())
|
||||
currentUpperBound = map.entries[i].key;
|
||||
else
|
||||
currentUpperBound = upperBoundKey;
|
||||
m_futureChildren.push_back(self->commitSubtree(self, snapshot, *(uint32_t*)map.entries[i].value.begin(), map.entries[i].key, currentUpperBound));
|
||||
}
|
||||
|
||||
Void _ = wait(waitForAll(m_futureChildren));
|
||||
|
@ -797,26 +810,16 @@ private:
|
|||
ACTOR static Future<Void> commit_impl(VersionedBTree *self) {
|
||||
Version latestVersion = wait(self->m_pager->getLatestVersion());
|
||||
|
||||
debug_printf("-------------------------------------\n");
|
||||
debug_printf("BEGIN COMMIT. MUTATION BUFFER:\n");
|
||||
for(auto &i : self->m_buffer) {
|
||||
debug_printf("'%s'\n", printable(i.first).c_str());
|
||||
for(auto j : i.second) {
|
||||
debug_printf("\t%s\n", j->toString().c_str());
|
||||
}
|
||||
}
|
||||
debug_printf("-------------------------------------\n");
|
||||
debug_printf("BEGINNING COMMIT.\n");
|
||||
self->printMutationBuffer();
|
||||
|
||||
VersionedChildrenT _ = wait(commitSubtree(self, self->m_pager->getReadSnapshot(latestVersion), self->m_root, StringRef(), endKey, self->m_buffer.begin(), self->m_buffer.end()));
|
||||
VersionedChildrenT _ = wait(commitSubtree(self, self->m_pager->getReadSnapshot(latestVersion), self->m_root, StringRef(), endKey.pack().key));
|
||||
|
||||
self->m_pager->setLatestVersion(self->m_writeVersion);
|
||||
Void _ = wait(self->m_pager->commit());
|
||||
self->m_lastCommittedVersion = self->m_writeVersion;
|
||||
|
||||
self->m_buffer.clear();
|
||||
for(Mutation *m : self->m_mutations)
|
||||
delete m;
|
||||
self->m_mutations.clear();
|
||||
self->initMutationBuffer();
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -824,10 +827,6 @@ private:
|
|||
IPager *m_pager;
|
||||
MutationBufferT m_buffer;
|
||||
|
||||
// TODO: Use or lose this
|
||||
MutationBufferPerVersionT m_buffer_byVersion;
|
||||
|
||||
std::vector<Mutation *> m_mutations;
|
||||
Version m_writeVersion;
|
||||
Version m_lastCommittedVersion;
|
||||
int m_page_size_override;
|
||||
|
@ -917,7 +916,7 @@ private:
|
|||
};
|
||||
};
|
||||
|
||||
Key VersionedBTree::endKey = Key(LiteralStringRef("\xff\xff\xff\xff\xff\xff\xff\xff"));
|
||||
KeyVersionValue VersionedBTree::endKey(LiteralStringRef("\xff\xff\xff\xff\xff\xff\xff\xff"), std::numeric_limits<Version>::max());
|
||||
|
||||
KeyValue randomKV(int keySize = 10, int valueSize = 5) {
|
||||
int kLen = g_random->randomInt(1, keySize);
|
||||
|
@ -1110,7 +1109,7 @@ TEST_CASE("/redwood/performance/set") {
|
|||
return Void();
|
||||
}
|
||||
|
||||
std::string SimpleFixedSizeMapRef::toString() {
|
||||
std::string SimpleFixedSizeMapRef::toString() const {
|
||||
std::string result;
|
||||
result.append(format("flags=0x%x data: ", flags));
|
||||
for(auto const &kv : entries) {
|
||||
|
|
Loading…
Reference in New Issue