Merge pull request #4933 from sfc-gh-satherton/redwood-deltatree2-master
Redwood DeltaTree2 Refactor, Evict obsolete Pages sooner, FlowMutex
This commit is contained in:
commit
389d6fcbe9
|
@ -1484,3 +1484,133 @@ TEST_CASE("/flow/flow/PromiseStream/move2") {
|
|||
ASSERT(movedTracker.copied == 0);
|
||||
return Void();
|
||||
}
|
||||
|
||||
constexpr double mutexTestDelay = 0.00001;
|
||||
|
||||
ACTOR Future<Void> mutexTest(int id, FlowMutex* mutex, int n, bool allowError, bool* verbose) {
|
||||
while (n-- > 0) {
|
||||
state double d = deterministicRandom()->random01() * mutexTestDelay;
|
||||
if (*verbose) {
|
||||
printf("%d:%d wait %f while unlocked\n", id, n, d);
|
||||
}
|
||||
wait(delay(d));
|
||||
|
||||
if (*verbose) {
|
||||
printf("%d:%d locking\n", id, n);
|
||||
}
|
||||
state FlowMutex::Lock lock = wait(mutex->take());
|
||||
if (*verbose) {
|
||||
printf("%d:%d locked\n", id, n);
|
||||
}
|
||||
|
||||
d = deterministicRandom()->random01() * mutexTestDelay;
|
||||
if (*verbose) {
|
||||
printf("%d:%d wait %f while locked\n", id, n, d);
|
||||
}
|
||||
wait(delay(d));
|
||||
|
||||
// On the last iteration, send an error or drop the lock if allowError is true
|
||||
if (n == 0 && allowError) {
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
// Send explicit error
|
||||
if (*verbose) {
|
||||
printf("%d:%d sending error\n", id, n);
|
||||
}
|
||||
lock.error(end_of_stream());
|
||||
} else {
|
||||
// Do nothing
|
||||
if (*verbose) {
|
||||
printf("%d:%d dropping promise, returning without unlock\n", id, n);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (*verbose) {
|
||||
printf("%d:%d unlocking\n", id, n);
|
||||
}
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
|
||||
if (*verbose) {
|
||||
printf("%d Returning\n", id);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/flow/flow/FlowMutex") {
|
||||
state int count = 100000;
|
||||
|
||||
// Default verboseness
|
||||
state bool verboseSetting = false;
|
||||
// Useful for debugging, enable verbose mode for this iteration number
|
||||
state int verboseTestIteration = -1;
|
||||
|
||||
try {
|
||||
state bool verbose = verboseSetting || count == verboseTestIteration;
|
||||
|
||||
while (--count > 0) {
|
||||
if (count % 1000 == 0) {
|
||||
printf("%d tests left\n", count);
|
||||
}
|
||||
|
||||
state FlowMutex mutex;
|
||||
state std::vector<Future<Void>> tests;
|
||||
|
||||
state bool allowErrors = deterministicRandom()->coinflip();
|
||||
if (verbose) {
|
||||
printf("\nTesting allowErrors=%d\n", allowErrors);
|
||||
}
|
||||
|
||||
state Optional<Error> error;
|
||||
|
||||
try {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
tests.push_back(mutexTest(i, &mutex, 10, allowErrors, &verbose));
|
||||
}
|
||||
wait(waitForAll(tests));
|
||||
|
||||
if (allowErrors) {
|
||||
if (verbose) {
|
||||
printf("Final wait in case error was injected by the last actor to finish\n");
|
||||
}
|
||||
wait(success(mutex.take()));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (verbose) {
|
||||
printf("Caught error %s\n", e.what());
|
||||
}
|
||||
error = e;
|
||||
|
||||
// Wait for all actors still running to finish their waits and try to take the mutex
|
||||
if (verbose) {
|
||||
printf("Waiting for completions\n");
|
||||
}
|
||||
wait(delay(2 * mutexTestDelay));
|
||||
|
||||
if (verbose) {
|
||||
printf("Future end states:\n");
|
||||
}
|
||||
// All futures should be ready, some with errors.
|
||||
bool allReady = true;
|
||||
for (int i = 0; i < tests.size(); ++i) {
|
||||
auto f = tests[i];
|
||||
if (verbose) {
|
||||
printf(
|
||||
" %d: %s\n", i, f.isReady() ? (f.isError() ? f.getError().what() : "done") : "not ready");
|
||||
}
|
||||
allReady = allReady && f.isReady();
|
||||
}
|
||||
ASSERT(allReady);
|
||||
}
|
||||
|
||||
// If an error was caused, one should have been detected.
|
||||
// Otherwise, no errors should be detected.
|
||||
ASSERT(error.present() == allowErrors);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
printf("Error at count=%d\n", count + 1);
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ set(FDBSERVER_SRCS
|
|||
IKeyValueContainer.h
|
||||
IKeyValueStore.h
|
||||
IPager.h
|
||||
IVersionedStore.h
|
||||
KeyValueStoreCompressTestData.actor.cpp
|
||||
KeyValueStoreMemory.actor.cpp
|
||||
KeyValueStoreRocksDB.actor.cpp
|
||||
|
|
|
@ -26,6 +26,14 @@
|
|||
#include "fdbserver/Knobs.h"
|
||||
#include <string.h>
|
||||
|
||||
#define DELTATREE_DEBUG 0
|
||||
|
||||
#if DELTATREE_DEBUG
|
||||
#define deltatree_printf(...) printf(__VA_ARGS__)
|
||||
#else
|
||||
#define deltatree_printf(...)
|
||||
#endif
|
||||
|
||||
typedef uint64_t Word;
|
||||
// Get the number of prefix bytes that are the same between a and b, up to their common length of cl
|
||||
static inline int commonPrefixLength(uint8_t const* ap, uint8_t const* bp, int cl) {
|
||||
|
@ -198,10 +206,6 @@ struct DeltaTree {
|
|||
smallOffsets.left = offset;
|
||||
}
|
||||
}
|
||||
|
||||
int size(bool large) const {
|
||||
return delta(large).size() + (large ? sizeof(smallOffsets) : sizeof(largeOffsets));
|
||||
}
|
||||
};
|
||||
|
||||
static constexpr int SmallSizeLimit = std::numeric_limits<uint16_t>::max();
|
||||
|
@ -356,8 +360,6 @@ public:
|
|||
|
||||
Mirror(const void* treePtr = nullptr, const T* lowerBound = nullptr, const T* upperBound = nullptr)
|
||||
: tree((DeltaTree*)treePtr), lower(lowerBound), upper(upperBound) {
|
||||
// TODO: Remove these copies into arena and require users of Mirror to keep prev and next alive during its
|
||||
// lifetime
|
||||
lower = new (arena) T(arena, *lower);
|
||||
upper = new (arena) T(arena, *upper);
|
||||
|
||||
|
@ -875,7 +877,10 @@ private:
|
|||
|
||||
int deltaSize = item.writeDelta(node.delta(largeNodes), *base, commonPrefix);
|
||||
node.delta(largeNodes).setPrefixSource(prefixSourcePrev);
|
||||
// printf("Serialized %s to %p\n", item.toString().c_str(), &root.delta(largeNodes));
|
||||
deltatree_printf("Serialized %s to offset %d data: %s\n",
|
||||
item.toString().c_str(),
|
||||
(uint8_t*)&node - (uint8_t*)this,
|
||||
StringRef((uint8_t*)&node.delta(largeNodes), deltaSize).toHexString().c_str());
|
||||
|
||||
// Continue writing after the serialized Delta.
|
||||
uint8_t* wptr = (uint8_t*)&node.delta(largeNodes) + deltaSize;
|
||||
|
@ -899,3 +904,823 @@ private:
|
|||
return wptr - (uint8_t*)&node;
|
||||
}
|
||||
};
|
||||
|
||||
// DeltaTree2 is a memory mappable binary tree of T objects such that each node's item is
|
||||
// stored as a Delta which can reproduce the node's T item given either
|
||||
// - The node's greatest lesser ancestor, called the "left parent"
|
||||
// - The node's least greater ancestor, called the "right parent"
|
||||
// One of these ancestors will also happen to be the node's direct parent.
|
||||
//
|
||||
// The Delta type is intended to make use of ordered prefix compression and borrow all
|
||||
// available prefix bytes from the ancestor T which shares the most prefix bytes with
|
||||
// the item T being encoded. If T is implemented properly, this results in perfect
|
||||
// prefix compression while performing O(log n) comparisons for a seek.
|
||||
//
|
||||
// T requirements
|
||||
//
|
||||
// Must be compatible with Standalone<T> and must implement the following additional things:
|
||||
//
|
||||
// // Return the common prefix length between *this and T
|
||||
// // skipLen is a hint, representing the length that is already known to be common.
|
||||
// int getCommonPrefixLen(const T& other, int skipLen) const;
|
||||
//
|
||||
// // Compare *this to rhs, returns < 0 for less than, 0 for equal, > 0 for greater than
|
||||
// // skipLen is a hint, representing the length that is already known to be common.
|
||||
// int compare(const T &rhs, int skipLen) const;
|
||||
//
|
||||
// // Writes to d a delta which can create *this from base
|
||||
// // commonPrefix is a hint, representing the length that is already known to be common.
|
||||
// // DeltaT's size need not be static, for more details see below.
|
||||
// void writeDelta(DeltaT &d, const T &base, int commonPrefix) const;
|
||||
//
|
||||
// // Returns the size in bytes of the DeltaT required to recreate *this from base
|
||||
// int deltaSize(const T &base) const;
|
||||
//
|
||||
// // A type which represents the parts of T that either borrowed from the base T
|
||||
// // or can be borrowed by other T's using the first T as a base
|
||||
// // Partials must allocate any heap storage in the provided Arena for any operation.
|
||||
// typedef Partial;
|
||||
//
|
||||
// // Update cache with the Partial for *this, storing any heap memory for the Partial in arena
|
||||
// void updateCache(Optional<Partial> cache, Arena& arena) const;
|
||||
//
|
||||
// // For debugging, return a useful human-readable string representation of *this
|
||||
// std::string toString() const;
|
||||
//
|
||||
// DeltaT requirements
|
||||
//
|
||||
// DeltaT can be variable sized, larger than sizeof(DeltaT), and implement the following:
|
||||
//
|
||||
// // Returns the size in bytes of this specific DeltaT instance
|
||||
// int size();
|
||||
//
|
||||
// // Apply *this to base and return the resulting T
|
||||
// // Store the Partial for T into cache, allocating any heap memory for the Partial in arena
|
||||
// T apply(Arena& arena, const T& base, Optional<T::Partial>& cache);
|
||||
//
|
||||
// // Recreate T from *this and the Partial for T
|
||||
// T apply(const T::Partial& cache);
|
||||
//
|
||||
// // Set or retrieve a boolean flag representing which base ancestor the DeltaT is to be applied to
|
||||
// void setPrefixSource(bool val);
|
||||
// bool getPrefixSource() const;
|
||||
//
|
||||
// // Set of retrieve a boolean flag representing that a DeltaTree node has been erased
|
||||
// void setDeleted(bool val);
|
||||
// bool getDeleted() const;
|
||||
//
|
||||
// // For debugging, return a useful human-readable string representation of *this
|
||||
// std::string toString() const;
|
||||
//
|
||||
#pragma pack(push, 1)
|
||||
template <typename T, typename DeltaT = typename T::Delta>
|
||||
struct DeltaTree2 {
|
||||
typedef typename T::Partial Partial;
|
||||
|
||||
struct {
|
||||
uint16_t numItems; // Number of items in the tree.
|
||||
uint32_t nodeBytesUsed; // Bytes used by nodes (everything after the tree header)
|
||||
uint32_t nodeBytesFree; // Bytes left at end of tree to expand into
|
||||
uint32_t nodeBytesDeleted; // Delta bytes deleted from tree. Note that some of these bytes could be borrowed by
|
||||
// descendents.
|
||||
uint8_t initialHeight; // Height of tree as originally built
|
||||
uint8_t maxHeight; // Maximum height of tree after any insertion. Value of 0 means no insertions done.
|
||||
bool largeNodes; // Node size, can be calculated as capacity > SmallSizeLimit but it will be used a lot
|
||||
};
|
||||
|
||||
// Node is not fixed size. Most node methods require the context of whether the node is in small or large
|
||||
// offset mode, passed as a boolean
|
||||
struct Node {
|
||||
// Offsets are relative to the start of the DeltaTree
|
||||
union {
|
||||
struct {
|
||||
uint32_t leftChild;
|
||||
uint32_t rightChild;
|
||||
|
||||
} largeOffsets;
|
||||
struct {
|
||||
uint16_t leftChild;
|
||||
uint16_t rightChild;
|
||||
} smallOffsets;
|
||||
};
|
||||
|
||||
static int headerSize(bool large) { return large ? sizeof(largeOffsets) : sizeof(smallOffsets); }
|
||||
|
||||
// Delta is located after the offsets, which differs by node size
|
||||
DeltaT& delta(bool large) { return large ? *(DeltaT*)(&largeOffsets + 1) : *(DeltaT*)(&smallOffsets + 1); };
|
||||
|
||||
// Delta is located after the offsets, which differs by node size
|
||||
const DeltaT& delta(bool large) const {
|
||||
return large ? *(DeltaT*)(&largeOffsets + 1) : *(DeltaT*)(&smallOffsets + 1);
|
||||
};
|
||||
|
||||
std::string toString(DeltaTree2* tree) const {
|
||||
return format("Node{offset=%d leftChild=%d rightChild=%d delta=%s}",
|
||||
tree->nodeOffset(this),
|
||||
getLeftChildOffset(tree->largeNodes),
|
||||
getRightChildOffset(tree->largeNodes),
|
||||
delta(tree->largeNodes).toString().c_str());
|
||||
}
|
||||
|
||||
#define getMember(m) (large ? largeOffsets.m : smallOffsets.m)
|
||||
#define setMember(m, v) \
|
||||
if (large) { \
|
||||
largeOffsets.m = v; \
|
||||
} else { \
|
||||
smallOffsets.m = v; \
|
||||
}
|
||||
|
||||
void setRightChildOffset(bool large, int offset) { setMember(rightChild, offset); }
|
||||
void setLeftChildOffset(bool large, int offset) { setMember(leftChild, offset); }
|
||||
|
||||
int getRightChildOffset(bool large) const { return getMember(rightChild); }
|
||||
int getLeftChildOffset(bool large) const { return getMember(leftChild); }
|
||||
|
||||
int size(bool large) const { return delta(large).size() + headerSize(large); }
|
||||
#undef getMember
|
||||
#undef setMember
|
||||
};
|
||||
|
||||
static constexpr int SmallSizeLimit = std::numeric_limits<uint16_t>::max();
|
||||
static constexpr int LargeTreePerNodeExtraOverhead = sizeof(Node::largeOffsets) - sizeof(Node::smallOffsets);
|
||||
|
||||
int nodeOffset(const Node* n) const { return (uint8_t*)n - (uint8_t*)this; }
|
||||
Node* nodeAt(int offset) { return offset == 0 ? nullptr : (Node*)((uint8_t*)this + offset); }
|
||||
Node* root() { return numItems == 0 ? nullptr : (Node*)(this + 1); }
|
||||
int rootOffset() { return sizeof(DeltaTree2); }
|
||||
|
||||
int size() const { return sizeof(DeltaTree2) + nodeBytesUsed; }
|
||||
int capacity() const { return size() + nodeBytesFree; }
|
||||
|
||||
public:
|
||||
// DecodedNode represents a Node of a DeltaTree and its T::Partial.
|
||||
// DecodedNodes are created on-demand, as DeltaTree Nodes are visited by a Cursor.
|
||||
// DecodedNodes link together to form a binary tree with the same Node relationships as their
|
||||
// corresponding DeltaTree Nodes. Additionally, DecodedNodes store links to their left and
|
||||
// right ancestors which correspond to possible base Nodes on which the Node's Delta is based.
|
||||
//
|
||||
// DecodedNode links are not pointers, but rather indices to be looked up in the DecodeCache
|
||||
// defined below. An index value of -1 is uninitialized, meaning it is not yet known whether
|
||||
// the corresponding DeltaTree Node link is non-null in any version of the DeltaTree which is
|
||||
// using or has used the DecodeCache.
|
||||
struct DecodedNode {
|
||||
DecodedNode(int nodeOffset, int leftParentIndex, int rightParentIndex)
|
||||
: nodeOffset(nodeOffset), leftParentIndex(leftParentIndex), rightParentIndex(rightParentIndex),
|
||||
leftChildIndex(-1), rightChildIndex(-1) {}
|
||||
int nodeOffset;
|
||||
int16_t leftParentIndex;
|
||||
int16_t rightParentIndex;
|
||||
int16_t leftChildIndex;
|
||||
int16_t rightChildIndex;
|
||||
Optional<Partial> partial;
|
||||
|
||||
Node* node(DeltaTree2* tree) const { return tree->nodeAt(nodeOffset); }
|
||||
|
||||
std::string toString() {
|
||||
return format("DecodedNode{nodeOffset=%d leftChildIndex=%d rightChildIndex=%d leftParentIndex=%d "
|
||||
"rightParentIndex=%d}",
|
||||
(int)nodeOffset,
|
||||
(int)leftChildIndex,
|
||||
(int)rightChildIndex,
|
||||
(int)leftParentIndex,
|
||||
(int)rightParentIndex);
|
||||
}
|
||||
};
|
||||
#pragma pack(pop)
|
||||
|
||||
// The DecodeCache is a reference counted structure that stores DecodedNodes by an integer index
|
||||
// and can be shared across a series of updated copies of a DeltaTree.
|
||||
//
|
||||
// DecodedNodes are stored in a contiguous vector, which sometimes must be expanded, so care
|
||||
// must be taken to resolve DecodedNode pointers again after the DecodeCache has new entries added.
|
||||
struct DecodeCache : FastAllocated<DecodeCache>, ReferenceCounted<DecodeCache> {
|
||||
DecodeCache(const T& lowerBound = T(), const T& upperBound = T())
|
||||
: lowerBound(arena, lowerBound), upperBound(arena, upperBound) {
|
||||
decodedNodes.reserve(10);
|
||||
deltatree_printf("DecodedNode size: %d\n", sizeof(DecodedNode));
|
||||
}
|
||||
|
||||
Arena arena;
|
||||
T lowerBound;
|
||||
T upperBound;
|
||||
|
||||
// Index 0 is always the root
|
||||
std::vector<DecodedNode> decodedNodes;
|
||||
|
||||
DecodedNode& get(int index) { return decodedNodes[index]; }
|
||||
|
||||
template <class... Args>
|
||||
int emplace_new(Args&&... args) {
|
||||
int index = decodedNodes.size();
|
||||
decodedNodes.emplace_back(args...);
|
||||
return index;
|
||||
}
|
||||
|
||||
bool empty() const { return decodedNodes.empty(); }
|
||||
|
||||
void clear() {
|
||||
decodedNodes.clear();
|
||||
Arena a;
|
||||
lowerBound = T(a, lowerBound);
|
||||
upperBound = T(a, upperBound);
|
||||
arena = a;
|
||||
}
|
||||
};
|
||||
|
||||
// Cursor provides a way to seek into a DeltaTree and iterate over its contents
|
||||
// The cursor needs a DeltaTree pointer and a DecodeCache, which can be shared
|
||||
// with other DeltaTrees which were incrementally modified to produce the the
|
||||
// tree that this cursor is referencing.
|
||||
struct Cursor {
|
||||
Cursor() : cache(nullptr), nodeIndex(-1) {}
|
||||
|
||||
Cursor(DecodeCache* cache, DeltaTree2* tree) : cache(cache), tree(tree), nodeIndex(-1) {}
|
||||
|
||||
Cursor(DecodeCache* cache, DeltaTree2* tree, int nodeIndex) : cache(cache), tree(tree), nodeIndex(nodeIndex) {}
|
||||
|
||||
// Copy constructor does not copy item because normally a copied cursor will be immediately moved.
|
||||
Cursor(const Cursor& c) : cache(c.cache), tree(c.tree), nodeIndex(c.nodeIndex) {}
|
||||
|
||||
Cursor next() const {
|
||||
Cursor c = *this;
|
||||
c.moveNext();
|
||||
return c;
|
||||
}
|
||||
|
||||
Cursor previous() const {
|
||||
Cursor c = *this;
|
||||
c.movePrev();
|
||||
return c;
|
||||
}
|
||||
|
||||
int rootIndex() {
|
||||
if (!cache->empty()) {
|
||||
return 0;
|
||||
} else if (tree->numItems != 0) {
|
||||
return cache->emplace_new(tree->rootOffset(), -1, -1);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
DeltaTree2* tree;
|
||||
DecodeCache* cache;
|
||||
int nodeIndex;
|
||||
mutable Optional<T> item;
|
||||
|
||||
Node* node() const { return tree->nodeAt(cache->get(nodeIndex).nodeOffset); }
|
||||
|
||||
std::string toString() const {
|
||||
if (nodeIndex == -1) {
|
||||
return format("Cursor{nodeIndex=-1}");
|
||||
}
|
||||
return format("Cursor{item=%s indexItem=%s nodeIndex=%d decodedNode=%s node=%s ",
|
||||
item.present() ? item.get().toString().c_str() : "<absent>",
|
||||
get(cache->get(nodeIndex)).toString().c_str(),
|
||||
nodeIndex,
|
||||
cache->get(nodeIndex).toString().c_str(),
|
||||
node()->toString(tree).c_str());
|
||||
}
|
||||
|
||||
bool valid() const { return nodeIndex != -1; }
|
||||
|
||||
// Get T for Node n, and provide to n's delta the base and local decode cache entries to use/modify
|
||||
const T get(DecodedNode& decoded) const {
|
||||
DeltaT& delta = decoded.node(tree)->delta(tree->largeNodes);
|
||||
|
||||
// If this node's cached partial is populated, then the delta can create T from that alone
|
||||
if (decoded.partial.present()) {
|
||||
return delta.apply(decoded.partial.get());
|
||||
}
|
||||
|
||||
// Otherwise, get the base T
|
||||
bool basePrev = delta.getPrefixSource();
|
||||
int baseIndex = basePrev ? decoded.leftParentIndex : decoded.rightParentIndex;
|
||||
|
||||
// If baseOffset is -1, then base T is DecodeCache's lower or upper bound
|
||||
if (baseIndex == -1) {
|
||||
return delta.apply(cache->arena, basePrev ? cache->lowerBound : cache->upperBound, decoded.partial);
|
||||
}
|
||||
|
||||
// Otherwise, get the base's decoded node
|
||||
DecodedNode& baseDecoded = cache->get(baseIndex);
|
||||
|
||||
// If the base's partial is present, apply delta to it to get result
|
||||
if (baseDecoded.partial.present()) {
|
||||
return delta.apply(cache->arena, baseDecoded.partial.get(), decoded.partial);
|
||||
}
|
||||
|
||||
// Otherwise apply delta to base T
|
||||
return delta.apply(cache->arena, get(baseDecoded), decoded.partial);
|
||||
}
|
||||
|
||||
public:
|
||||
// Get the item at the cursor
|
||||
// Behavior is undefined if the cursor is not valid.
|
||||
// If the cursor is moved, the reference object returned will be modified to
|
||||
// the cursor's new current item.
|
||||
const T& get() const {
|
||||
if (!item.present()) {
|
||||
item = get(cache->get(nodeIndex));
|
||||
}
|
||||
return item.get();
|
||||
}
|
||||
|
||||
void switchTree(DeltaTree2* newTree) { tree = newTree; }
|
||||
|
||||
// If the cursor is valid, return a reference to the cursor's internal T.
|
||||
// Otherwise, returns a reference to the cache's upper boundary.
|
||||
const T& getOrUpperBound() const { return valid() ? get() : cache->upperBound; }
|
||||
|
||||
bool operator==(const Cursor& rhs) const { return nodeIndex == rhs.nodeIndex; }
|
||||
bool operator!=(const Cursor& rhs) const { return nodeIndex != rhs.nodeIndex; }
|
||||
|
||||
// The seek methods, of the form seek[Less|Greater][orEqual](...) are very similar.
|
||||
// They attempt move the cursor to the [Greatest|Least] item, based on the name of the function.
|
||||
// Then will not "see" erased records.
|
||||
// If successful, they return true, and if not then false a while making the cursor invalid.
|
||||
// These methods forward arguments to the seek() overloads, see those for argument descriptions.
|
||||
template <typename... Args>
|
||||
bool seekLessThan(Args... args) {
|
||||
int cmp = seek(args...);
|
||||
if (cmp < 0 || (cmp == 0 && nodeIndex != -1)) {
|
||||
movePrev();
|
||||
}
|
||||
return _hideDeletedBackward();
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
bool seekLessThanOrEqual(Args... args) {
|
||||
int cmp = seek(args...);
|
||||
if (cmp < 0) {
|
||||
movePrev();
|
||||
}
|
||||
return _hideDeletedBackward();
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
bool seekGreaterThan(Args... args) {
|
||||
int cmp = seek(args...);
|
||||
if (cmp > 0 || (cmp == 0 && nodeIndex != -1)) {
|
||||
moveNext();
|
||||
}
|
||||
return _hideDeletedForward();
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
bool seekGreaterThanOrEqual(Args... args) {
|
||||
int cmp = seek(args...);
|
||||
if (cmp > 0) {
|
||||
moveNext();
|
||||
}
|
||||
return _hideDeletedForward();
|
||||
}
|
||||
|
||||
// Get the right child index for parentIndex
|
||||
int getRightChildIndex(int parentIndex) {
|
||||
DecodedNode* parent = &cache->get(parentIndex);
|
||||
|
||||
// The cache may have a child index, but since cache covers multiple versions of a DeltaTree
|
||||
// it can't be used unless the node in the tree has a child.
|
||||
int childOffset = parent->node(tree)->getRightChildOffset(tree->largeNodes);
|
||||
|
||||
if (childOffset == 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// parent has this child so return the index if it is in DecodedNode
|
||||
if (parent->rightChildIndex != -1) {
|
||||
return parent->rightChildIndex;
|
||||
}
|
||||
|
||||
// Create the child's DecodedNode and get its index
|
||||
int childIndex = cache->emplace_new(childOffset, parentIndex, parent->rightParentIndex);
|
||||
|
||||
// Set the index in the parent. The cache lookup is repeated because the cache has changed.
|
||||
cache->get(parentIndex).rightChildIndex = childIndex;
|
||||
return childIndex;
|
||||
}
|
||||
|
||||
// Get the left child index for parentIndex
|
||||
int getLeftChildIndex(int parentIndex) {
|
||||
DecodedNode* parent = &cache->get(parentIndex);
|
||||
|
||||
// The cache may have a child index, but since cache covers multiple versions of a DeltaTree
|
||||
// it can't be used unless the node in the tree has a child.
|
||||
int childOffset = parent->node(tree)->getLeftChildOffset(tree->largeNodes);
|
||||
|
||||
if (childOffset == 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// parent has this child so return the index if it is in DecodedNode
|
||||
if (parent->leftChildIndex != -1) {
|
||||
return parent->leftChildIndex;
|
||||
}
|
||||
|
||||
// Create the child's DecodedNode and get its index
|
||||
int childIndex = cache->emplace_new(childOffset, parent->leftParentIndex, parentIndex);
|
||||
|
||||
// Set the index in the parent. The cache lookup is repeated because the cache has changed.
|
||||
cache->get(parentIndex).leftChildIndex = childIndex;
|
||||
return childIndex;
|
||||
}
|
||||
|
||||
// seek() moves the cursor to a node containing s or the node that would be the parent of s if s were to be
|
||||
// added to the tree. If the tree was empty, the cursor will be invalid and the return value will be 0.
|
||||
// Otherwise, returns the result of s.compare(item at cursor position)
|
||||
// Does not skip/avoid deleted nodes.
|
||||
int seek(const T& s, int skipLen = 0) {
|
||||
nodeIndex = -1;
|
||||
item.reset();
|
||||
deltatree_printf("seek(%s) start %s\n", s.toString().c_str(), toString().c_str());
|
||||
int nIndex = rootIndex();
|
||||
int cmp = 0;
|
||||
|
||||
while (nIndex != -1) {
|
||||
nodeIndex = nIndex;
|
||||
item.reset();
|
||||
cmp = s.compare(get(), skipLen);
|
||||
deltatree_printf("seek(%s) loop cmp=%d %s\n", s.toString().c_str(), cmp, toString().c_str());
|
||||
if (cmp == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (cmp > 0) {
|
||||
nIndex = getRightChildIndex(nIndex);
|
||||
} else {
|
||||
nIndex = getLeftChildIndex(nIndex);
|
||||
}
|
||||
}
|
||||
|
||||
return cmp;
|
||||
}
|
||||
|
||||
bool moveFirst() {
|
||||
nodeIndex = -1;
|
||||
item.reset();
|
||||
int nIndex = rootIndex();
|
||||
deltatree_printf("moveFirst start %s\n", toString().c_str());
|
||||
while (nIndex != -1) {
|
||||
nodeIndex = nIndex;
|
||||
deltatree_printf("moveFirst moved %s\n", toString().c_str());
|
||||
nIndex = getLeftChildIndex(nIndex);
|
||||
}
|
||||
return _hideDeletedForward();
|
||||
}
|
||||
|
||||
bool moveLast() {
|
||||
nodeIndex = -1;
|
||||
item.reset();
|
||||
int nIndex = rootIndex();
|
||||
deltatree_printf("moveLast start %s\n", toString().c_str());
|
||||
while (nIndex != -1) {
|
||||
nodeIndex = nIndex;
|
||||
deltatree_printf("moveLast moved %s\n", toString().c_str());
|
||||
nIndex = getRightChildIndex(nIndex);
|
||||
}
|
||||
return _hideDeletedBackward();
|
||||
}
|
||||
|
||||
// Try to move to next node, sees deleted nodes.
|
||||
void _moveNext() {
|
||||
deltatree_printf("_moveNext start %s\n", toString().c_str());
|
||||
item.reset();
|
||||
// Try to go right
|
||||
int nIndex = getRightChildIndex(nodeIndex);
|
||||
|
||||
// If we couldn't go right, then the answer is our next ancestor
|
||||
if (nIndex == -1) {
|
||||
nodeIndex = cache->get(nodeIndex).rightParentIndex;
|
||||
deltatree_printf("_moveNext move1 %s\n", toString().c_str());
|
||||
} else {
|
||||
// Go left as far as possible
|
||||
do {
|
||||
nodeIndex = nIndex;
|
||||
deltatree_printf("_moveNext move2 %s\n", toString().c_str());
|
||||
nIndex = getLeftChildIndex(nodeIndex);
|
||||
} while (nIndex != -1);
|
||||
}
|
||||
}
|
||||
|
||||
// Try to move to previous node, sees deleted nodes.
|
||||
void _movePrev() {
|
||||
deltatree_printf("_movePrev start %s\n", toString().c_str());
|
||||
item.reset();
|
||||
// Try to go left
|
||||
int nIndex = getLeftChildIndex(nodeIndex);
|
||||
// If we couldn't go left, then the answer is our prev ancestor
|
||||
if (nIndex == -1) {
|
||||
nodeIndex = cache->get(nodeIndex).leftParentIndex;
|
||||
deltatree_printf("_movePrev move1 %s\n", toString().c_str());
|
||||
} else {
|
||||
// Go right as far as possible
|
||||
do {
|
||||
nodeIndex = nIndex;
|
||||
deltatree_printf("_movePrev move2 %s\n", toString().c_str());
|
||||
nIndex = getRightChildIndex(nodeIndex);
|
||||
} while (nIndex != -1);
|
||||
}
|
||||
}
|
||||
|
||||
bool moveNext() {
|
||||
_moveNext();
|
||||
return _hideDeletedForward();
|
||||
}
|
||||
|
||||
bool movePrev() {
|
||||
_movePrev();
|
||||
return _hideDeletedBackward();
|
||||
}
|
||||
|
||||
DeltaT& getDelta() const { return cache->get(nodeIndex).node(tree)->delta(tree->largeNodes); }
|
||||
|
||||
bool isErased() const { return getDelta().getDeleted(); }
|
||||
|
||||
// Erase current item by setting its deleted flag to true.
|
||||
// Tree header is updated if a change is made.
|
||||
// Cursor is then moved forward to the next non-deleted node.
|
||||
void erase() {
|
||||
auto& delta = getDelta();
|
||||
if (!delta.getDeleted()) {
|
||||
delta.setDeleted(true);
|
||||
--tree->numItems;
|
||||
tree->nodeBytesDeleted += (delta.size() + Node::headerSize(tree->largeNodes));
|
||||
}
|
||||
moveNext();
|
||||
}
|
||||
|
||||
// Erase k by setting its deleted flag to true. Returns true only if k existed
|
||||
bool erase(const T& k, int skipLen = 0) {
|
||||
Cursor c(cache, tree);
|
||||
if (c.seek(k, skipLen) == 0 && !c.isErased()) {
|
||||
c.erase();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Try to insert k into the DeltaTree, updating byte counts and initialHeight if they
|
||||
// have changed (they won't if k already exists in the tree but was deleted).
|
||||
// Returns true if successful, false if k does not fit in the space available
|
||||
// or if k is already in the tree (and was not already deleted).
|
||||
// Insertion on an empty tree returns false as well.
|
||||
// Insert does NOT change the cursor position.
|
||||
bool insert(const T& k, int skipLen = 0, int maxHeightAllowed = std::numeric_limits<int>::max()) {
|
||||
deltatree_printf("insert %s\n", k.toString().c_str());
|
||||
|
||||
int nIndex = rootIndex();
|
||||
int parentIndex = nIndex;
|
||||
DecodedNode* parentDecoded;
|
||||
// Result of comparing node at parentIndex
|
||||
int cmp = 0;
|
||||
// Height of the inserted node
|
||||
int height = 0;
|
||||
|
||||
// Find the parent to add the node to
|
||||
// This is just seek but modifies parentIndex instead of nodeIndex and tracks the insertion height
|
||||
deltatree_printf(
|
||||
"insert(%s) start %s\n", k.toString().c_str(), Cursor(cache, tree, parentIndex).toString().c_str());
|
||||
while (nIndex != -1) {
|
||||
++height;
|
||||
parentIndex = nIndex;
|
||||
parentDecoded = &cache->get(parentIndex);
|
||||
cmp = k.compare(get(*parentDecoded), skipLen);
|
||||
deltatree_printf("insert(%s) moved cmp=%d %s\n",
|
||||
k.toString().c_str(),
|
||||
cmp,
|
||||
Cursor(cache, tree, parentIndex).toString().c_str());
|
||||
|
||||
if (cmp == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (cmp > 0) {
|
||||
deltatree_printf("insert(%s) move right\n", k.toString().c_str());
|
||||
nIndex = getRightChildIndex(nIndex);
|
||||
} else {
|
||||
deltatree_printf("insert(%s) move left\n", k.toString().c_str());
|
||||
nIndex = getLeftChildIndex(nIndex);
|
||||
}
|
||||
}
|
||||
|
||||
// If the item is found, mark it erased if it isn't already
|
||||
if (cmp == 0) {
|
||||
DeltaT& delta = tree->nodeAt(parentDecoded->nodeOffset)->delta(tree->largeNodes);
|
||||
if (delta.getDeleted()) {
|
||||
delta.setDeleted(false);
|
||||
++tree->numItems;
|
||||
tree->nodeBytesDeleted -= (delta.size() + Node::headerSize(tree->largeNodes));
|
||||
deltatree_printf("insert(%s) deleted item restored %s\n",
|
||||
k.toString().c_str(),
|
||||
Cursor(cache, tree, parentIndex).toString().c_str());
|
||||
return true;
|
||||
}
|
||||
deltatree_printf("insert(%s) item exists %s\n",
|
||||
k.toString().c_str(),
|
||||
Cursor(cache, tree, parentIndex).toString().c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
// If the tree was empty or the max insertion height is exceeded then fail
|
||||
if (parentIndex == -1 || height > maxHeightAllowed) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Find the base base to borrow from, see if the resulting delta fits into the tree
|
||||
int leftBaseIndex, rightBaseIndex;
|
||||
bool addingRight = cmp > 0;
|
||||
if (addingRight) {
|
||||
leftBaseIndex = parentIndex;
|
||||
rightBaseIndex = parentDecoded->rightParentIndex;
|
||||
} else {
|
||||
leftBaseIndex = parentDecoded->leftParentIndex;
|
||||
rightBaseIndex = parentIndex;
|
||||
}
|
||||
|
||||
T leftBase = leftBaseIndex == -1 ? cache->lowerBound : get(cache->get(leftBaseIndex));
|
||||
T rightBase = rightBaseIndex == -1 ? cache->upperBound : get(cache->get(rightBaseIndex));
|
||||
|
||||
int common = leftBase.getCommonPrefixLen(rightBase, skipLen);
|
||||
int commonWithLeftParent = k.getCommonPrefixLen(leftBase, common);
|
||||
int commonWithRightParent = k.getCommonPrefixLen(rightBase, common);
|
||||
bool borrowFromLeft = commonWithLeftParent >= commonWithRightParent;
|
||||
|
||||
const T* base;
|
||||
int commonPrefix;
|
||||
if (borrowFromLeft) {
|
||||
base = &leftBase;
|
||||
commonPrefix = commonWithLeftParent;
|
||||
} else {
|
||||
base = &rightBase;
|
||||
commonPrefix = commonWithRightParent;
|
||||
}
|
||||
|
||||
int deltaSize = k.deltaSize(*base, commonPrefix, false);
|
||||
int nodeSpace = deltaSize + Node::headerSize(tree->largeNodes);
|
||||
|
||||
if (nodeSpace > tree->nodeBytesFree) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int childOffset = tree->size();
|
||||
Node* childNode = tree->nodeAt(childOffset);
|
||||
childNode->setLeftChildOffset(tree->largeNodes, 0);
|
||||
childNode->setRightChildOffset(tree->largeNodes, 0);
|
||||
|
||||
// Create the decoded node and link it to the parent
|
||||
// Link the parent's decodednode to the child's decodednode
|
||||
// Link the parent node in the tree to the new child node
|
||||
// true if node is being added to right child
|
||||
int childIndex = cache->emplace_new(childOffset, leftBaseIndex, rightBaseIndex);
|
||||
|
||||
// Get a new parentDecoded pointer as the cache may have changed allocations
|
||||
parentDecoded = &cache->get(parentIndex);
|
||||
|
||||
if (addingRight) {
|
||||
// Adding child to right of parent
|
||||
parentDecoded->rightChildIndex = childIndex;
|
||||
parentDecoded->node(tree)->setRightChildOffset(tree->largeNodes, childOffset);
|
||||
} else {
|
||||
// Adding child to left of parent
|
||||
parentDecoded->leftChildIndex = childIndex;
|
||||
parentDecoded->node(tree)->setLeftChildOffset(tree->largeNodes, childOffset);
|
||||
}
|
||||
|
||||
// Give k opportunity to populate its cache partial record
|
||||
k.updateCache(cache->get(childIndex).partial, cache->arena);
|
||||
|
||||
DeltaT& childDelta = childNode->delta(tree->largeNodes);
|
||||
deltatree_printf("insert(%s) writing delta from %s\n", k.toString().c_str(), base->toString().c_str());
|
||||
int written = k.writeDelta(childDelta, *base, commonPrefix);
|
||||
ASSERT(deltaSize == written);
|
||||
childDelta.setPrefixSource(borrowFromLeft);
|
||||
|
||||
tree->nodeBytesUsed += nodeSpace;
|
||||
tree->nodeBytesFree -= nodeSpace;
|
||||
++tree->numItems;
|
||||
|
||||
// Update max height of the tree if necessary
|
||||
if (height > tree->maxHeight) {
|
||||
tree->maxHeight = height;
|
||||
}
|
||||
|
||||
deltatree_printf("insert(%s) done parent=%s\n",
|
||||
k.toString().c_str(),
|
||||
Cursor(cache, tree, parentIndex).toString().c_str());
|
||||
deltatree_printf("insert(%s) done child=%s\n",
|
||||
k.toString().c_str(),
|
||||
Cursor(cache, tree, childIndex).toString().c_str());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
bool _hideDeletedBackward() {
|
||||
while (nodeIndex != -1 && getDelta().getDeleted()) {
|
||||
_movePrev();
|
||||
}
|
||||
return nodeIndex != -1;
|
||||
}
|
||||
|
||||
bool _hideDeletedForward() {
|
||||
while (nodeIndex != -1 && getDelta().getDeleted()) {
|
||||
_moveNext();
|
||||
}
|
||||
return nodeIndex != -1;
|
||||
}
|
||||
};
|
||||
|
||||
// Returns number of bytes written
|
||||
int build(int spaceAvailable, const T* begin, const T* end, const T* lowerBound, const T* upperBound) {
|
||||
largeNodes = spaceAvailable > SmallSizeLimit;
|
||||
int count = end - begin;
|
||||
numItems = count;
|
||||
nodeBytesDeleted = 0;
|
||||
initialHeight = (uint8_t)log2(count) + 1;
|
||||
maxHeight = 0;
|
||||
|
||||
// The boundary leading to the new page acts as the last time we branched right
|
||||
if (count > 0) {
|
||||
nodeBytesUsed = buildSubtree(
|
||||
*root(), begin, end, lowerBound, upperBound, lowerBound->getCommonPrefixLen(*upperBound, 0));
|
||||
} else {
|
||||
nodeBytesUsed = 0;
|
||||
}
|
||||
nodeBytesFree = spaceAvailable - size();
|
||||
return size();
|
||||
}
|
||||
|
||||
private:
|
||||
int buildSubtree(Node& node,
|
||||
const T* begin,
|
||||
const T* end,
|
||||
const T* leftParent,
|
||||
const T* rightParent,
|
||||
int subtreeCommon) {
|
||||
|
||||
int count = end - begin;
|
||||
|
||||
// Find key to be stored in root
|
||||
int mid = perfectSubtreeSplitPointCached(count);
|
||||
const T& item = begin[mid];
|
||||
|
||||
int commonWithPrev = item.getCommonPrefixLen(*leftParent, subtreeCommon);
|
||||
int commonWithNext = item.getCommonPrefixLen(*rightParent, subtreeCommon);
|
||||
|
||||
bool prefixSourcePrev;
|
||||
int commonPrefix;
|
||||
const T* base;
|
||||
if (commonWithPrev >= commonWithNext) {
|
||||
prefixSourcePrev = true;
|
||||
commonPrefix = commonWithPrev;
|
||||
base = leftParent;
|
||||
} else {
|
||||
prefixSourcePrev = false;
|
||||
commonPrefix = commonWithNext;
|
||||
base = rightParent;
|
||||
}
|
||||
|
||||
int deltaSize = item.writeDelta(node.delta(largeNodes), *base, commonPrefix);
|
||||
node.delta(largeNodes).setPrefixSource(prefixSourcePrev);
|
||||
|
||||
// Continue writing after the serialized Delta.
|
||||
uint8_t* wptr = (uint8_t*)&node.delta(largeNodes) + deltaSize;
|
||||
|
||||
int leftChildOffset;
|
||||
// Serialize left subtree
|
||||
if (count > 1) {
|
||||
leftChildOffset = wptr - (uint8_t*)this;
|
||||
deltatree_printf("%p: offset=%d count=%d serialize left subtree leftChildOffset=%d\n",
|
||||
this,
|
||||
nodeOffset(&node),
|
||||
count,
|
||||
leftChildOffset);
|
||||
|
||||
wptr += buildSubtree(*(Node*)wptr, begin, begin + mid, leftParent, &item, commonWithPrev);
|
||||
} else {
|
||||
leftChildOffset = 0;
|
||||
}
|
||||
|
||||
int rightChildOffset;
|
||||
// Serialize right subtree
|
||||
if (count > 2) {
|
||||
rightChildOffset = wptr - (uint8_t*)this;
|
||||
deltatree_printf("%p: offset=%d count=%d serialize right subtree rightChildOffset=%d\n",
|
||||
this,
|
||||
nodeOffset(&node),
|
||||
count,
|
||||
rightChildOffset);
|
||||
|
||||
wptr += buildSubtree(*(Node*)wptr, begin + mid + 1, end, &item, rightParent, commonWithNext);
|
||||
} else {
|
||||
rightChildOffset = 0;
|
||||
}
|
||||
|
||||
node.setLeftChildOffset(largeNodes, leftChildOffset);
|
||||
node.setRightChildOffset(largeNodes, rightChildOffset);
|
||||
|
||||
deltatree_printf("%p: Serialized %s as %s\n", this, item.toString().c_str(), node.toString(this).c_str());
|
||||
|
||||
return wptr - (uint8_t*)&node;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -128,10 +128,7 @@ public:
|
|||
|
||||
class IPagerSnapshot {
|
||||
public:
|
||||
virtual Future<Reference<const ArenaPage>> getPhysicalPage(LogicalPageID pageID,
|
||||
bool cacheable,
|
||||
bool nohit,
|
||||
bool* fromCache = nullptr) = 0;
|
||||
virtual Future<Reference<const ArenaPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool nohit) = 0;
|
||||
virtual bool tryEvictPage(LogicalPageID id) = 0;
|
||||
virtual Version getVersion() const = 0;
|
||||
|
||||
|
@ -191,10 +188,7 @@ public:
|
|||
// Cacheable indicates that the page should be added to the page cache (if applicable?) as a result of this read.
|
||||
// NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are
|
||||
// considered likely to be needed soon.
|
||||
virtual Future<Reference<ArenaPage>> readPage(LogicalPageID pageID,
|
||||
bool cacheable = true,
|
||||
bool noHit = false,
|
||||
bool* fromCache = nullptr) = 0;
|
||||
virtual Future<Reference<ArenaPage>> readPage(LogicalPageID pageID, bool cacheable = true, bool noHit = false) = 0;
|
||||
virtual Future<Reference<ArenaPage>> readExtent(LogicalPageID pageID) = 0;
|
||||
virtual void releaseExtentReadLock() = 0;
|
||||
|
||||
|
|
|
@ -1,80 +0,0 @@
|
|||
/*
|
||||
* IVersionedStore.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_IVERSIONEDSTORE_H
|
||||
#define FDBSERVER_IVERSIONEDSTORE_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
|
||||
class IStoreCursor {
|
||||
public:
|
||||
virtual Future<Void> findEqual(KeyRef key) = 0;
|
||||
virtual Future<Void> findFirstEqualOrGreater(KeyRef key, int prefetchBytes = 0) = 0;
|
||||
virtual Future<Void> findLastLessOrEqual(KeyRef key, int prefetchBytes = 0) = 0;
|
||||
virtual Future<Void> next() = 0;
|
||||
virtual Future<Void> prev() = 0;
|
||||
|
||||
virtual bool isValid() = 0;
|
||||
virtual KeyRef getKey() = 0;
|
||||
virtual ValueRef getValue() = 0;
|
||||
|
||||
virtual void addref() = 0;
|
||||
virtual void delref() = 0;
|
||||
};
|
||||
|
||||
class IVersionedStore : public IClosable {
|
||||
public:
|
||||
virtual KeyValueStoreType getType() const = 0;
|
||||
virtual bool supportsMutation(int op) const = 0; // If this returns true, then mutate(op, ...) may be called
|
||||
virtual StorageBytes getStorageBytes() const = 0;
|
||||
|
||||
// Writes are provided in an ordered stream.
|
||||
// A write is considered part of (a change leading to) the version determined by the previous call to
|
||||
// setWriteVersion() A write shall not become durable until the following call to commit() begins, and shall be
|
||||
// durable once the following call to commit() returns
|
||||
virtual void set(KeyValueRef keyValue) = 0;
|
||||
virtual void clear(KeyRangeRef range) = 0;
|
||||
virtual void mutate(int op, StringRef param1, StringRef param2) = 0;
|
||||
virtual void setWriteVersion(Version) = 0; // The write version must be nondecreasing
|
||||
virtual void setOldestVersion(Version v) = 0; // Set oldest readable version to be used in next commit
|
||||
virtual Version getOldestVersion() const = 0; // Get oldest readable version
|
||||
virtual Future<Void> commit() = 0;
|
||||
|
||||
virtual Future<Void> init() = 0;
|
||||
virtual Version getLatestVersion() const = 0;
|
||||
|
||||
// readAtVersion() may only be called on a version which has previously been passed to setWriteVersion() and never
|
||||
// previously passed
|
||||
// to forgetVersion. The returned results when violating this precondition are unspecified; the store is not
|
||||
// required to be able to detect violations.
|
||||
// The returned read cursor provides a consistent snapshot of the versioned store, corresponding to all the writes
|
||||
// done with write versions less
|
||||
// than or equal to the given version.
|
||||
// If readAtVersion() is called on the *current* write version, the given read cursor MAY reflect subsequent writes
|
||||
// at the same
|
||||
// write version, OR it may represent a snapshot as of the call to readAtVersion().
|
||||
virtual Reference<IStoreCursor> readAtVersion(Version) = 0;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -268,10 +268,6 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
|
||||
init( DD_REMOVE_STORE_ENGINE_DELAY, 60.0 ); if( randomize && BUGGIFY ) DD_REMOVE_STORE_ENGINE_DELAY = deterministicRandom()->random01() * 60.0;
|
||||
|
||||
// Redwood Storage Engine
|
||||
init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT, 30 );
|
||||
init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_MIN, 0 );
|
||||
|
||||
// KeyValueStore SQLITE
|
||||
init( CLEAR_BUFFER_SIZE, 20000 );
|
||||
init( READ_VALUE_TIME_ESTIMATE, .00005 );
|
||||
|
@ -714,9 +710,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
|
||||
init( REDWOOD_DEFAULT_EXTENT_SIZE, 32 * 1024 * 1024 );
|
||||
init( REDWOOD_DEFAULT_EXTENT_READ_SIZE, 1024 * 1024 );
|
||||
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
|
||||
init( REDWOOD_COMMIT_CONCURRENT_READS, 64 );
|
||||
init( REDWOOD_EXTENT_CONCURRENT_READS, 4 );
|
||||
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
|
||||
init( REDWOOD_PAGE_REBUILD_MAX_SLACK, 0.33 );
|
||||
init( REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES, 10 );
|
||||
init( REDWOOD_LAZY_CLEAR_MIN_PAGES, 0 );
|
||||
|
|
|
@ -222,10 +222,6 @@ public:
|
|||
double DD_FAILURE_TIME;
|
||||
double DD_ZERO_HEALTHY_TEAM_DELAY;
|
||||
|
||||
// Redwood Storage Engine
|
||||
int PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT;
|
||||
int PREFIX_TREE_IMMEDIATE_KEY_SIZE_MIN;
|
||||
|
||||
// KeyValueStore SQLITE
|
||||
int CLEAR_BUFFER_SIZE;
|
||||
double READ_VALUE_TIME_ESTIMATE;
|
||||
|
@ -646,9 +642,8 @@ public:
|
|||
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
|
||||
int REDWOOD_DEFAULT_EXTENT_SIZE; // Extent size for new Redwood files
|
||||
int REDWOOD_DEFAULT_EXTENT_READ_SIZE; // Extent read size for Redwood files
|
||||
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
|
||||
int REDWOOD_COMMIT_CONCURRENT_READS; // Max number of concurrent reads done to support commit operations
|
||||
int REDWOOD_EXTENT_CONCURRENT_READS; // Max number of simultaneous extent disk reads in progress.
|
||||
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
|
||||
double REDWOOD_PAGE_REBUILD_MAX_SLACK; // When rebuilding pages, max slack to allow in page
|
||||
int REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES; // Number of pages to try to pop from the lazy delete queue and process at
|
||||
// once
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1271,6 +1271,40 @@ Future<T> waitOrError(Future<T> f, Future<Void> errorSignal) {
|
|||
}
|
||||
}
|
||||
|
||||
// A low-overhead FIFO mutex made with no internal queue structure (no list, deque, vector, etc)
|
||||
// The lock is implemented as a Promise<Void>, which is returned to callers in a convenient wrapper
|
||||
// called Lock.
|
||||
//
|
||||
// Usage:
|
||||
// Lock lock = wait(mutex.take());
|
||||
// lock.release(); // Next waiter will get the lock, OR
|
||||
// lock.error(e); // Next waiter will get e, future waiters will see broken_promise
|
||||
// lock = Lock(); // Or let Lock and any copies go out of scope. All waiters will see broken_promise.
|
||||
struct FlowMutex {
|
||||
FlowMutex() { lastPromise.send(Void()); }
|
||||
|
||||
bool available() { return lastPromise.isSet(); }
|
||||
|
||||
struct Lock {
|
||||
void release() { promise.send(Void()); }
|
||||
|
||||
void error(Error e = broken_promise()) { promise.sendError(e); }
|
||||
|
||||
// This is exposed in case the caller wants to use/copy it directly
|
||||
Promise<Void> promise;
|
||||
};
|
||||
|
||||
Future<Lock> take() {
|
||||
Lock newLock;
|
||||
Future<Lock> f = lastPromise.isSet() ? newLock : tag(lastPromise.getFuture(), newLock);
|
||||
lastPromise = newLock.promise;
|
||||
return f;
|
||||
}
|
||||
|
||||
private:
|
||||
Promise<Void> lastPromise;
|
||||
};
|
||||
|
||||
struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
|
||||
// FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code
|
||||
// between wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock
|
||||
|
|
Loading…
Reference in New Issue