Merge pull request #6574 from sfc-gh-satherton/redwood-rare-bugs

Rare correctness bug fixes in Redwood
This commit is contained in:
Steve Atherton 2022-04-01 16:40:22 -07:00 committed by GitHub
commit 6eb1c2ae48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 313 additions and 162 deletions

View File

@ -1373,7 +1373,7 @@ struct StorageMetadataType {
StorageMetadataType() : createdTime(0) {}
StorageMetadataType(uint64_t t) : createdTime(t) {}
static uint64_t currentTime() { return g_network->timer() * 1e9; }
static uint64_t currentTime() { return g_network->timer_int(); }
// To change this serialization, ProtocolVersion::StorageMetadata must be updated, and downgrades need
// to be considered

View File

@ -296,7 +296,7 @@ Future<Void> StorageWiggler::restoreStats() {
return map(readFuture, assignFunc);
}
Future<Void> StorageWiggler::startWiggle() {
metrics.last_wiggle_start = timer_int();
metrics.last_wiggle_start = g_network->timer_int();
if (shouldStartNewRound()) {
metrics.last_round_start = metrics.last_wiggle_start;
}
@ -304,7 +304,7 @@ Future<Void> StorageWiggler::startWiggle() {
}
Future<Void> StorageWiggler::finishWiggle() {
metrics.last_wiggle_finish = timer_int();
metrics.last_wiggle_finish = g_network->timer_int();
metrics.finished_wiggle += 1;
auto duration = metrics.last_wiggle_finish - metrics.last_wiggle_start;
metrics.smoothed_wiggle_duration.setTotal((double)duration);

View File

@ -1077,7 +1077,7 @@ public:
Node* node(DeltaTree2* tree) const { return tree->nodeAt(nodeOffset); }
std::string toString() {
std::string toString() const {
return format("DecodedNode{nodeOffset=%d leftChildIndex=%d rightChildIndex=%d leftParentIndex=%d "
"rightParentIndex=%d}",
(int)nodeOffset,
@ -1155,6 +1155,19 @@ public:
arena = a;
updateUsedMemory();
}
std::string toString() const {
std::string s = format("DecodeCache{%p\n", this);
s += format("upperBound %s\n", upperBound.toString().c_str());
s += format("lowerBound %s\n", lowerBound.toString().c_str());
s += format("arenaSize %d\n", arena.getSize());
s += format("decodedNodes %d {\n", decodedNodes.size());
for (auto const& n : decodedNodes) {
s += format(" %s\n", n.toString().c_str());
}
s += format("}}\n");
return s;
}
};
// Cursor provides a way to seek into a DeltaTree and iterate over its contents
@ -1686,7 +1699,7 @@ public:
int count = end - begin;
numItems = count;
nodeBytesDeleted = 0;
initialHeight = (uint8_t)log2(count) + 1;
initialHeight = count == 0 ? 0 : (uint8_t)log2(count) + 1;
maxHeight = 0;
// The boundary leading to the new page acts as the last time we branched right

View File

@ -62,7 +62,7 @@
{ \
std::string prefix = format("%s %f %04d ", g_network->getLocalAddress().toString().c_str(), now(), __LINE__); \
std::string msg = format(__VA_ARGS__); \
writePrefixedLines(debug_printf_stream, prefix, msg); \
fputs(addPrefix(prefix, msg).c_str(), debug_printf_stream); \
fflush(debug_printf_stream); \
}
@ -73,11 +73,13 @@
std::string prefix = \
format("%s %f %04d ", g_network->getLocalAddress().toString().c_str(), now(), __LINE__); \
std::string msg = format(__VA_ARGS__); \
writePrefixedLines(debug_printf_stream, prefix, msg); \
fputs(addPrefix(prefix, msg).c_str(), debug_printf_stream); \
fflush(debug_printf_stream); \
} \
}
#define debug_print(str) debug_printf("%s\n", str.c_str())
#define debug_print_always(str) debug_printf_always("%s\n", str.c_str())
#define debug_printf_noop(...)
#if defined(NO_INTELLISENSE)
@ -97,13 +99,18 @@
#define TRACE \
debug_printf_always("%s: %s line %d %s\n", __FUNCTION__, __FILE__, __LINE__, platform::get_backtrace().c_str());
// Writes prefix:line for each line in msg to fout
void writePrefixedLines(FILE* fout, std::string prefix, std::string msg) {
StringRef m = msg;
// Returns a string where every line in lines is prefixed with prefix
std::string addPrefix(std::string prefix, std::string lines) {
StringRef m = lines;
std::string s;
while (m.size() != 0) {
StringRef line = m.eat("\n");
fprintf(fout, "%s %s\n", prefix.c_str(), line.toString().c_str());
s += prefix;
s += ' ';
s += line.toString();
s += '\n';
}
return s;
}
#define PRIORITYMULTILOCK_DEBUG 0
@ -917,12 +924,15 @@ public:
}
}
// If readNext() cannot complete immediately, it will route to here
// The mutex will be taken if locked is false
// The next page will be waited for if load is true
// If readNext() cannot complete immediately because it must wait for IO, it will route to here.
// The purpose of this function is to serialize simultaneous readers on self while letting the
// common case (>99.8% of the time) be handled with low overhead by the non-actor readNext() function.
//
// The mutex will be taken if locked is false.
// The next page will be waited for if load is true.
// Only mutex holders will wait on the page read.
ACTOR static Future<Optional<T>> waitThenReadNext(Cursor* self,
Optional<T> upperBound,
Optional<T> inclusiveMaximum,
FlowMutex::Lock* lock,
bool load) {
state FlowMutex::Lock localLock;
@ -940,7 +950,7 @@ public:
wait(success(self->nextPageReader));
}
state Optional<T> result = wait(self->readNext(upperBound, &localLock));
state Optional<T> result = wait(self->readNext(inclusiveMaximum, &localLock));
// If a lock was not passed in, so this actor locked the mutex above, then unlock it
if (lock == nullptr) {
@ -959,10 +969,12 @@ public:
return result;
}
// Read the next item at the cursor (if < upperBound), moving to a new page first if the current page is
// exhausted If locked is true, this call owns the mutex, which would have been locked by readNext() before a
// recursive call
Future<Optional<T>> readNext(const Optional<T>& upperBound = {}, FlowMutex::Lock* lock = nullptr) {
// Read the next item from the cursor, possibly moving to and waiting for a new page if the prior page was
// exhausted. If the item is <= inclusiveMaximum, then return it after advancing the cursor to the next item.
// Otherwise, return nothing and do not advance the cursor.
// If locked is true, this call owns the mutex, which would have been locked by readNext() before a recursive
// call. See waitThenReadNext() for more detail.
Future<Optional<T>> readNext(const Optional<T>& inclusiveMaximum = {}, FlowMutex::Lock* lock = nullptr) {
if ((mode != POP && mode != READONLY) || pageID == invalidLogicalPageID || pageID == endPageID) {
debug_printf("FIFOQueue::Cursor(%s) readNext returning nothing\n", toString().c_str());
return Optional<T>();
@ -970,7 +982,7 @@ public:
// If we don't have a lock and the mutex isn't available then acquire it
if (lock == nullptr && isBusy()) {
return waitThenReadNext(this, upperBound, lock, false);
return waitThenReadNext(this, inclusiveMaximum, lock, false);
}
// We now know pageID is valid and should be used, but page might not point to it yet
@ -986,7 +998,7 @@ public:
}
if (!nextPageReader.isReady()) {
return waitThenReadNext(this, upperBound, lock, true);
return waitThenReadNext(this, inclusiveMaximum, lock, true);
}
page = nextPageReader.get();
@ -1007,11 +1019,11 @@ public:
int bytesRead;
const T result = Codec::readFromBytes(p->begin() + offset, bytesRead);
if (upperBound.present() && upperBound.get() < result) {
if (inclusiveMaximum.present() && inclusiveMaximum.get() < result) {
debug_printf("FIFOQueue::Cursor(%s) not popping %s, exceeds upper bound %s\n",
toString().c_str(),
::toString(result).c_str(),
::toString(upperBound.get()).c_str());
::toString(inclusiveMaximum.get()).c_str());
return Optional<T>();
}
@ -1059,10 +1071,10 @@ public:
}
}
debug_printf("FIFOQueue(%s) %s(upperBound=%s) -> %s\n",
debug_printf("FIFOQueue(%s) %s(inclusiveMaximum=%s) -> %s\n",
queue->name.c_str(),
(mode == POP ? "pop" : "peek"),
::toString(upperBound).c_str(),
::toString(inclusiveMaximum).c_str(),
::toString(result).c_str());
return Optional<T>(result);
}
@ -1290,8 +1302,8 @@ public:
Future<Optional<T>> peek() { return peek_impl(this); }
// Pop the next item on front of queue if it is <= upperBound or if upperBound is not present
Future<Optional<T>> pop(Optional<T> upperBound = {}) { return headReader.readNext(upperBound); }
// Pop the next item on front of queue if it is <= inclusiveMaximum or if inclusiveMaximum is not present
Future<Optional<T>> pop(Optional<T> inclusiveMaximum = {}) { return headReader.readNext(inclusiveMaximum); }
QueueState getState() const {
QueueState s;
@ -1484,8 +1496,8 @@ public:
int64_t numEntries;
int dataBytesPerPage;
int pagesPerExtent;
bool usesExtents;
bool tailPageNewExtent;
bool usesExtents = false;
bool tailPageNewExtent = false;
LogicalPageID prevExtentEndPageID;
Cursor headReader;
@ -2758,6 +2770,8 @@ public:
return f;
}
// Free pageID as of version v. This means that once the oldest readable pager snapshot is at version v, pageID is
// not longer in use by any structure so it can be used to write new data.
void freeUnmappedPage(PhysicalPageID pageID, Version v) {
// If v is older than the oldest version still readable then mark pageID as free as of the next commit
if (v < effectiveOldestVersion()) {
@ -2823,7 +2837,7 @@ public:
void freePage(LogicalPageID pageID, Version v) override {
// If pageID has been remapped, then it can't be freed until all existing remaps for that page have been undone,
// so queue it for later deletion
// so queue it for later deletion during remap cleanup
auto i = remappedPages.find(pageID);
if (i != remappedPages.end()) {
debug_printf("DWALPager(%s) op=freeRemapped %s @%" PRId64 " oldestVersion=%" PRId64 "\n",
@ -3331,7 +3345,11 @@ public:
// Since the next item can be arbitrarily ahead in the queue, secondType is determined by
// looking at the remappedPages structure.
//
// R == Remap F == Free D == Detach | == oldestRetaineedVersion
// R == Remap F == Free D == Detach | == oldestRetainedVersion
//
// oldestRetainedVersion is the oldest version being maintained as readable, either because it is explicitly the
// oldest readable version set or because there is an active snapshot for the version even though it is older
// than the explicitly set oldest readable version.
//
// R R | free new ID
// R F | free new ID if R and D are at different versions
@ -3411,13 +3429,32 @@ public:
}
if (freeNewID) {
debug_printf("DWALPager(%s) remapCleanup freeNew %s\n", self->filename.c_str(), p.toString().c_str());
self->freeUnmappedPage(p.newPageID, 0);
debug_printf("DWALPager(%s) remapCleanup freeNew %s %s\n",
self->filename.c_str(),
p.toString().c_str(),
toString(self->getLastCommittedVersion()).c_str());
// newID must be freed at the latest committed version to avoid a read race between caching and non-caching
// readers. It is possible that there are readers of newID in flight right now that either
// - Did not read through the page cache
// - Did read through the page cache but there was no entry for the page at the time, so one was created
// and the read future is still pending
// In either case the physical read of newID from disk can happen at some time after right now and after the
// current commit is finished.
//
// If newID is freed immediately, meaning as of the end of the current commit, then it could be reused in
// the next commit which could be before any reads fitting the above description have completed, causing
// those reads to the new write which is incorrect. Since such readers could be using pager snapshots at
// versions up to and including the latest committed version, newID must be freed *after* that version is no
// longer readable.
self->freeUnmappedPage(p.newPageID, self->getLastCommittedVersion() + 1);
++g_redwoodMetrics.metric.pagerRemapFree;
}
if (freeOriginalID) {
debug_printf("DWALPager(%s) remapCleanup freeOriginal %s\n", self->filename.c_str(), p.toString().c_str());
// originalID can be freed immediately because it is already the case that there are no readers at a version
// prior to oldestRetainedVersion so no reader will need originalID.
self->freeUnmappedPage(p.originalPageID, 0);
++g_redwoodMetrics.metric.pagerRemapFree;
}
@ -3654,6 +3691,7 @@ public:
self->operations.clear();
debug_printf("DWALPager(%s) shutdown destroy page cache\n", self->filename.c_str());
wait(self->extentCache.clear());
wait(self->pageCache.clear());
wait(delay(0));
@ -4575,7 +4613,7 @@ struct BTreePage {
ValueTree* valueTree() const { return (ValueTree*)(this + 1); }
std::string toString(bool write,
std::string toString(const char* context,
BTreePageIDRef id,
Version ver,
const RedwoodRecordRef& lowerBound,
@ -4583,7 +4621,7 @@ struct BTreePage {
std::string r;
r += format("BTreePage op=%s %s @%" PRId64
" ptr=%p height=%d count=%d kvBytes=%d\n lowerBound: %s\n upperBound: %s\n",
write ? "write" : "read",
context,
::toString(id).c_str(),
ver,
this,
@ -4684,24 +4722,43 @@ struct DecodeBoundaryVerifier {
typedef std::map<Version, DecodeBoundaries> BoundariesByVersion;
std::unordered_map<LogicalPageID, BoundariesByVersion> boundariesByPageID;
std::vector<Key> boundarySamples;
int boundarySampleSize = 1000;
int boundaryPopulation = 0;
static DecodeBoundaryVerifier* getVerifier(std::string name) {
static std::map<std::string, DecodeBoundaryVerifier> verifiers;
// Verifier disabled due to not being finished
//
// Only use verifier in a non-restarted simulation so that all page writes are captured
// if (g_network->isSimulated() && !g_simulator.restarted) {
// return &verifiers[name];
// }
if (g_network->isSimulated() && !g_simulator.restarted) {
return &verifiers[name];
}
return nullptr;
}
void sampleBoundary(Key b) {
if (boundaryPopulation <= boundarySampleSize) {
boundarySamples.push_back(b);
} else if (deterministicRandom()->random01() < ((double)boundarySampleSize / boundaryPopulation)) {
boundarySamples[deterministicRandom()->randomInt(0, boundarySampleSize)] = b;
}
++boundaryPopulation;
}
Key getSample() const {
if (boundarySamples.empty()) {
return Key();
}
return boundarySamples[deterministicRandom()->randomInt(0, boundarySamples.size())];
}
void update(BTreePageIDRef id, Version v, Key lowerBound, Key upperBound) {
sampleBoundary(lowerBound);
sampleBoundary(upperBound);
debug_printf("decodeBoundariesUpdate %s %s '%s' to '%s'\n",
::toString(id).c_str(),
::toString(v).c_str(),
lowerBound.toString().c_str(),
upperBound.toString().c_str());
lowerBound.printable().c_str(),
upperBound.printable().c_str());
auto& b = boundariesByPageID[id.front()][v];
ASSERT(b.empty());
@ -4717,28 +4774,53 @@ struct DecodeBoundaryVerifier {
--b;
if (b->second.lower != lowerBound || b->second.upper != upperBound) {
fprintf(stderr,
"Boundary mismatch on %s %s\nFound :%s %s\nExpected:%s %s\n",
"Boundary mismatch on %s %s\nUsing:\n\t'%s'\n\t'%s'\nWritten %s:\n\t'%s'\n\t'%s'\n",
::toString(id).c_str(),
::toString(v).c_str(),
lowerBound.toString().c_str(),
upperBound.toString().c_str(),
b->second.lower.toString().c_str(),
b->second.upper.toString().c_str());
lowerBound.printable().c_str(),
upperBound.printable().c_str(),
::toString(b->first).c_str(),
b->second.lower.printable().c_str(),
b->second.upper.printable().c_str());
return false;
}
return true;
}
void update(Version v, LogicalPageID oldID, LogicalPageID newID) {
debug_printf("decodeBoundariesUpdate copy %s %s to %s\n",
::toString(v).c_str(),
::toString(oldID).c_str(),
::toString(newID).c_str());
auto& old = boundariesByPageID[oldID];
ASSERT(!old.empty());
auto i = old.end();
--i;
boundariesByPageID[newID][v] = i->second;
debug_printf("decodeBoundariesUpdate copy %s %s to %s '%s' to '%s'\n",
::toString(v).c_str(),
::toString(oldID).c_str(),
::toString(newID).c_str(),
i->second.lower.printable().c_str(),
i->second.upper.printable().c_str());
}
void removeAfterVersion(Version version) {
auto i = boundariesByPageID.begin();
while (i != boundariesByPageID.end()) {
auto v = i->second.upper_bound(version);
while (v != i->second.end()) {
debug_printf("decodeBoundariesUpdate remove %s %s '%s' to '%s'\n",
::toString(v->first).c_str(),
::toString(i->first).c_str(),
v->second.lower.printable().c_str(),
v->second.upper.printable().c_str());
v = i->second.erase(v);
}
if (i->second.empty()) {
debug_printf("decodeBoundariesUpdate remove empty map for %s\n", ::toString(i->first).c_str());
i = boundariesByPageID.erase(i);
} else {
++i;
}
}
}
};
@ -5024,8 +5106,14 @@ public:
self->m_newOldestVersion = self->m_pager->getOldestReadableVersion();
debug_printf("Recovered pager to version %" PRId64 ", oldest version is %" PRId64 "\n",
self->getLastCommittedVersion(),
self->m_newOldestVersion);
// Clear any changes that occurred after the latest committed version
if (self->m_pBoundaryVerifier != nullptr) {
self->m_pBoundaryVerifier->removeAfterVersion(self->getLastCommittedVersion());
}
state Key meta = self->m_pager->getMetaKey();
if (meta.size() == 0) {
// Create new BTree
@ -5825,10 +5913,17 @@ private:
const RedwoodRecordRef& lowerBound,
const RedwoodRecordRef& upperBound) {
if (page->userData == nullptr) {
debug_printf("Creating DecodeCache for ptr=%p lower=%s upper=%s\n",
debug_printf("Creating DecodeCache for ptr=%p lower=%s upper=%s %s\n",
page->begin(),
lowerBound.toString(false).c_str(),
upperBound.toString(false).c_str());
upperBound.toString(false).c_str(),
((BTreePage*)page->begin())
->toString("cursor",
lowerBound.value.present() ? lowerBound.getChildPage() : BTreePageIDRef(),
-1,
lowerBound,
upperBound)
.c_str());
BTreePage::BinaryTree::DecodeCache* cache =
new BTreePage::BinaryTree::DecodeCache(lowerBound, upperBound, m_pDecodeCacheMemory);
@ -5890,12 +5985,13 @@ private:
BTreePage* btPage = (BTreePage*)page->begin();
BTreePage::BinaryTree::DecodeCache* cache = (BTreePage::BinaryTree::DecodeCache*)page->userData;
debug_printf_always(
"updateBTreePage(%s, %s) %s\n",
"updateBTreePage(%s, %s) start, page:\n%s\n",
::toString(oldID).c_str(),
::toString(writeVersion).c_str(),
cache == nullptr
? "<noDecodeCache>"
: btPage->toString(true, oldID, writeVersion, cache->lowerBound, cache->upperBound).c_str());
: btPage->toString("updateBTreePage", oldID, writeVersion, cache->lowerBound, cache->upperBound)
.c_str());
}
state unsigned int height = (unsigned int)((BTreePage*)page->begin())->height;
@ -5912,7 +6008,11 @@ private:
LogicalPageID id = wait(self->m_pager->newPageID());
emptyPages[i] = id;
}
debug_printf("updateBTreePage: newPages %s", toString(emptyPages).c_str());
debug_printf("updateBTreePage(%s, %s): newPages %s",
::toString(oldID).c_str(),
::toString(writeVersion).c_str(),
toString(emptyPages).c_str());
self->m_pager->updatePage(PagerEventReasons::Commit, height, emptyPages, page);
i = 0;
for (const LogicalPageID id : emptyPages) {
@ -5956,13 +6056,15 @@ private:
RedwoodRecordRef decodeLowerBound;
RedwoodRecordRef decodeUpperBound;
// Returns true of BTree logical boundaries and DeltaTree decoding boundaries are the same.
bool boundariesNormal() const {
// If the decode upper boundary is the subtree upper boundary the pointers will be the same
// For the lower boundary, if the pointers are not the same there is still a possibility
// that the keys are the same. This happens for the first remaining subtree of an internal page
// after the prior subtree(s) were cleared.
return (decodeUpperBound == subtreeUpperBound) &&
(decodeLowerBound == subtreeLowerBound || decodeLowerBound.sameExceptValue(subtreeLowerBound));
// Often these strings will refer to the same memory so same() is used as a faster way of determining
// equality in thec common case, but if it does not match a string comparison is needed as they can
// still be the same. This can happen for the first remaining subtree of an internal page
// after all prior subtree(s) were cleared.
return (
(decodeUpperBound.key.same(subtreeUpperBound.key) || decodeUpperBound.key == subtreeUpperBound.key) &&
(decodeLowerBound.key.same(subtreeLowerBound.key) || decodeLowerBound.key == subtreeLowerBound.key));
}
// The record range of the subtree slice is cBegin to cEnd
@ -6026,6 +6128,7 @@ private:
// Set the child page ID, which has already been allocated in result.arena()
newLinks.back().setChildPage(maybeNewID);
childrenChanged = true;
expectedUpperBound = decodeUpperBound;
} else {
childrenChanged = false;
}
@ -6070,6 +6173,7 @@ private:
s += format("SubtreeUpper: %s\n", subtreeUpperBound.toString(false).c_str());
s += format("expectedUpperBound: %s\n",
expectedUpperBound.present() ? expectedUpperBound.get().toString(false).c_str() : "(null)");
s += format("newLinks:\n");
for (int i = 0; i < newLinks.size(); ++i) {
s += format(" %i: %s\n", i, newLinks[i].toString(false).c_str());
}
@ -6178,10 +6282,10 @@ private:
// This must be called for each of the InternalPageSliceUpdates in sorted order.
void applyUpdate(InternalPageSliceUpdate& u, const RedwoodRecordRef* nextBoundary) {
debug_printf("applyUpdate nextBoundary=(%p) %s %s\n",
debug_printf("applyUpdate nextBoundary=(%p) %s\n",
nextBoundary,
(nextBoundary != nullptr) ? nextBoundary->toString(false).c_str() : "",
u.toString().c_str());
(nextBoundary != nullptr) ? nextBoundary->toString(false).c_str() : "");
debug_print(addPrefix("applyUpdate", u.toString()));
// If the children changed, replace [cBegin, cEnd) with newLinks
if (u.childrenChanged) {
@ -6195,7 +6299,7 @@ private:
}
while (c != u.cEnd) {
debug_printf("internal page (updating) erasing: %s\n", c.get().toString(false).c_str());
debug_printf("applyUpdate (updating) erasing: %s\n", c.get().toString(false).c_str());
btPage()->kvBytes -= c.get().kvBytes();
c.erase();
}
@ -6226,7 +6330,7 @@ private:
keep(u.cBegin, u.cEnd);
}
// If there is an expected upper boundary for the next range after u
// If there is an expected upper boundary for the next range start after u
if (u.expectedUpperBound.present()) {
// Then if it does not match the next boundary then insert a dummy record
if (nextBoundary == nullptr || (nextBoundary != &u.expectedUpperBound.get() &&
@ -6253,23 +6357,29 @@ private:
state std::string context;
if (REDWOOD_DEBUG) {
context = format("CommitSubtree(root=%s): ", toString(rootID).c_str());
context = format("CommitSubtree(root=%s+%d %s): ",
toString(rootID.front()).c_str(),
rootID.size() - 1,
::toString(batch->writeVersion).c_str());
}
debug_printf("%s %s\n", context.c_str(), update->toString().c_str());
debug_printf("%s rootID=%s\n", context.c_str(), toString(rootID).c_str());
debug_print(addPrefix(context, update->toString()));
if (REDWOOD_DEBUG) {
debug_printf("%s ---------MUTATION BUFFER SLICE ---------------------\n", context.c_str());
auto begin = mBegin;
int c = 0;
auto i = mBegin;
while (1) {
debug_printf("%s Mutation: '%s': %s\n",
debug_printf("%s Mutation %4d '%s': %s\n",
context.c_str(),
printable(begin.key()).c_str(),
begin.mutation().toString().c_str());
if (begin == mEnd) {
c,
printable(i.key()).c_str(),
i.mutation().toString().c_str());
if (i == mEnd) {
break;
}
++begin;
++c;
++i;
}
debug_printf("%s -------------------------------------\n", context.c_str());
}
state Reference<const ArenaPage> page =
@ -6291,13 +6401,13 @@ private:
// TryToUpdate indicates insert and erase operations should be tried on the existing page first
state bool tryToUpdate = btPage->tree()->numItems > 0 && update->boundariesNormal();
debug_printf(
"%s commitSubtree(): %s\n",
context.c_str(),
btPage
->toString(
false, rootID, batch->snapshot->getVersion(), update->decodeLowerBound, update->decodeUpperBound)
.c_str());
debug_printf("%s tryToUpdate=%d\n", context.c_str(), tryToUpdate);
debug_print(addPrefix(context,
btPage->toString("commitSubtreeStart",
rootID,
batch->snapshot->getVersion(),
update->decodeLowerBound,
update->decodeUpperBound)));
state BTreePage::BinaryTree::Cursor cursor = update->cBegin.valid()
? self->getCursor(page.getPtr(), update->cBegin)
@ -6312,22 +6422,6 @@ private:
}
}
if (REDWOOD_DEBUG) {
debug_printf("%s ---------MUTATION BUFFER SLICE ---------------------\n", context.c_str());
auto begin = mBegin;
while (1) {
debug_printf("%s Mutation: '%s': %s\n",
context.c_str(),
printable(begin.key()).c_str(),
begin.mutation().toString().c_str());
if (begin == mEnd) {
break;
}
++begin;
}
debug_printf("%s -------------------------------------\n", context.c_str());
}
// Leaf Page
if (btPage->isLeaf()) {
// When true, we are modifying the existing DeltaTree
@ -6566,9 +6660,8 @@ private:
// No changes were actually made. This could happen if the only mutations are clear ranges which do not
// match any records.
if (!changesMade) {
debug_printf("%s No changes were made during mutation merge, returning %s\n",
context.c_str(),
toString(*update).c_str());
debug_printf("%s No changes were made during mutation merge, returning slice:\n", context.c_str());
debug_print(addPrefix(context, update->toString()));
return Void();
} else {
debug_printf(
@ -6581,17 +6674,26 @@ private:
if (cursor.tree->numItems == 0) {
update->cleared();
self->freeBTreePage(height, rootID, batch->writeVersion);
debug_printf("%s Page updates cleared all entries, returning %s\n",
context.c_str(),
toString(*update).c_str());
debug_printf("%s Page updates cleared all entries, returning slice:\n", context.c_str());
debug_print(addPrefix(context, update->toString()));
} else {
// Otherwise update it.
BTreePageIDRef newID = wait(self->updateBTreePage(
self, rootID, &update->newLinks.arena(), pageCopy.castTo<ArenaPage>(), batch->writeVersion));
debug_printf("%s Leaf node updated in-place at version %s, new contents:\n",
context.c_str(),
toString(batch->writeVersion).c_str());
debug_print(addPrefix(context,
btPage->toString("updateLeafNode",
newID,
batch->snapshot->getVersion(),
update->decodeLowerBound,
update->decodeUpperBound)));
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
debug_printf(
"%s Page updated in-place, returning %s\n", context.c_str(), toString(*update).c_str());
debug_printf("%s Leaf node updated in-place, returning slice:\n", context.c_str());
debug_print(addPrefix(context, update->toString()));
}
return Void();
}
@ -6601,9 +6703,8 @@ private:
update->cleared();
self->freeBTreePage(height, rootID, batch->writeVersion);
debug_printf("%s All leaf page contents were cleared, returning %s\n",
context.c_str(),
toString(*update).c_str());
debug_printf("%s All leaf page contents were cleared, returning slice:\n", context.c_str());
debug_print(addPrefix(context, update->toString()));
return Void();
}
@ -6619,7 +6720,8 @@ private:
// Put new links into update and tell update that pages were rebuilt
update->rebuilt(entries);
debug_printf("%s Merge complete, returning %s\n", context.c_str(), toString(*update).c_str());
debug_printf("%s Merge complete, returning slice:\n", context.c_str());
debug_print(addPrefix(context, update->toString()));
return Void();
} else {
// Internal Page
@ -6668,8 +6770,8 @@ private:
if (!cursor.get().value.present()) {
// If the upper bound is provided by a dummy record in [cBegin, cEnd) then there is no
// requirement on the next subtree range or the parent page to have a specific upper boundary
// for decoding the subtree.
u.expectedUpperBound.reset();
// for decoding the subtree. The expected upper bound has not yet been set so it can remain
// empty.
cursor.moveNext();
// If there is another record after the null child record, it must have a child page value
ASSERT(!cursor.valid() || cursor.get().value.present());
@ -6756,12 +6858,12 @@ private:
RedwoodRecordRef rec = c.get();
if (rec.value.present()) {
if (height == 2) {
debug_printf("%s: freeing child page in cleared subtree range: %s\n",
debug_printf("%s freeing child page in cleared subtree range: %s\n",
context.c_str(),
::toString(rec.getChildPage()).c_str());
self->freeBTreePage(height, rec.getChildPage(), batch->writeVersion);
} else {
debug_printf("%s: queuing subtree deletion cleared subtree range: %s\n",
debug_printf("%s queuing subtree deletion cleared subtree range: %s\n",
context.c_str(),
::toString(rec.getChildPage()).c_str());
self->m_lazyClearQueue.pushBack(LazyClearQueueEntry{
@ -6774,9 +6876,8 @@ private:
// Subtree range unchanged
}
debug_printf("%s: MutationBuffer covers this range in a single mutation, not recursing: %s\n",
context.c_str(),
u.toString().c_str());
debug_printf("%s Not recursing, one mutation range covers this slice:\n", context.c_str());
debug_print(addPrefix(context, u.toString()));
// u has already been initialized with the correct result, no recursion needed, so restart the
// loop.
@ -6785,6 +6886,9 @@ private:
}
// If this page has height of 2 then its children are leaf nodes
debug_printf("%s Recursing for %s\n", context.c_str(), toString(pageID).c_str());
debug_print(addPrefix(context, u.toString()));
recursions.push_back(self->commitSubtree(self, batch, pageID, height - 1, mBegin, mEnd, &u));
}
@ -6823,10 +6927,11 @@ private:
// passed, so in the event a different upper boundary is needed it will be added to the already-modified
// page. Otherwise, the decode boundary is used which will prevent this page from being modified for the
// sole purpose of adding a dummy upper bound record.
debug_printf("%s Applying final child range update. changesMade=%d Parent update is: %s\n",
debug_printf("%s Applying final child range update. changesMade=%d\nSubtree Root Update:\n",
context.c_str(),
modifier.changesMade,
update->toString().c_str());
modifier.changesMade);
debug_print(addPrefix(context, update->toString()));
modifier.applyUpdate(*slices.back(),
modifier.changesMade ? &update->subtreeUpperBound : &update->decodeUpperBound);
@ -6859,9 +6964,11 @@ private:
if (modifier.changesMade || forceUpdate) {
if (modifier.empty()) {
update->cleared();
debug_printf("%s All internal page children were deleted so deleting this page too, returning %s\n",
context.c_str(),
toString(*update).c_str());
debug_printf(
"%s All internal page children were deleted so deleting this page too. Returning slice:\n",
context.c_str());
debug_print(addPrefix(context, update->toString()));
self->freeBTreePage(height, rootID, batch->writeVersion);
} else {
if (modifier.updating) {
@ -6899,9 +7006,10 @@ private:
}
parentInfo->clear();
if (forceUpdate && detached == 0) {
debug_printf("%s No children detached during forced update, returning %s\n",
context.c_str(),
toString(*update).c_str());
debug_printf("%s No children detached during forced update, returning slice:\n",
context.c_str());
debug_print(addPrefix(context, update->toString()));
return Void();
}
}
@ -6912,21 +7020,19 @@ private:
pageCopy.castTo<ArenaPage>(),
batch->writeVersion));
debug_printf(
"%s commitSubtree(): Internal page updated in-place at version %s, new contents: %s\n",
"%s commitSubtree(): Internal node updated in-place at version %s, new contents:\n",
context.c_str(),
toString(batch->writeVersion).c_str(),
btPage
->toString(false,
newID,
batch->snapshot->getVersion(),
update->decodeLowerBound,
update->decodeUpperBound)
.c_str());
toString(batch->writeVersion).c_str());
debug_print(addPrefix(context,
btPage->toString("updateInternalNode",
newID,
batch->snapshot->getVersion(),
update->decodeLowerBound,
update->decodeUpperBound)));
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
debug_printf("%s Internal page updated in-place, returning %s\n",
context.c_str(),
toString(*update).c_str());
debug_printf("%s Internal node updated in-place, returning slice:\n", context.c_str());
debug_print(addPrefix(context, update->toString()));
} else {
// Page was rebuilt, possibly split.
debug_printf("%s Internal page could not be modified, rebuilding replacement(s).\n",
@ -6973,12 +7079,13 @@ private:
rootID));
update->rebuilt(newChildEntries);
debug_printf(
"%s Internal page rebuilt, returning %s\n", context.c_str(), toString(*update).c_str());
debug_printf("%s Internal page rebuilt, returning slice:\n", context.c_str());
debug_print(addPrefix(context, update->toString()));
}
}
} else {
debug_printf("%s Page has no changes, returning %s\n", context.c_str(), toString(*update).c_str());
debug_printf("%s Page has no changes, returning slice:\n", context.c_str());
debug_print(addPrefix(context, update->toString()));
}
return Void();
}
@ -9472,9 +9579,11 @@ TEST_CASE("Lredwood/correctness/btree") {
state double clearProbability =
params.getDouble("clearProbability").orDefault(deterministicRandom()->random01() * .1);
state double clearExistingBoundaryProbability =
params.getDouble("clearProbability").orDefault(deterministicRandom()->random01() * .5);
params.getDouble("clearExistingBoundaryProbability").orDefault(deterministicRandom()->random01() * .5);
state double clearSingleKeyProbability =
params.getDouble("clearSingleKeyProbability").orDefault(deterministicRandom()->random01());
params.getDouble("clearSingleKeyProbability").orDefault(deterministicRandom()->random01() * .1);
state double clearKnownNodeBoundaryProbability =
params.getDouble("clearKnownNodeBoundaryProbability").orDefault(deterministicRandom()->random01() * .1);
state double clearPostSetProbability =
params.getDouble("clearPostSetProbability").orDefault(deterministicRandom()->random01() * .1);
state double coldStartProbability =
@ -9495,10 +9604,11 @@ TEST_CASE("Lredwood/correctness/btree") {
// These settings are an attempt to keep the test execution real reasonably short
state int64_t maxPageOps = params.getInt("maxPageOps").orDefault((shortTest || serialTest) ? 50e3 : 1e6);
state int maxVerificationMapEntries =
params.getInt("maxVerificationMapEntries").orDefault((1.0 - coldStartProbability) * 300e3);
state int maxVerificationMapEntries = params.getInt("maxVerificationMapEntries").orDefault(300e3);
state int maxColdStarts = params.getInt("maxColdStarts").orDefault(300);
// Max number of records in the BTree or the versioned written map to visit
state int64_t maxRecordsRead = 300e6;
state int64_t maxRecordsRead = params.getInt("maxRecordsRead").orDefault(300e6);
printf("\n");
printf("file: %s\n", file.c_str());
@ -9516,9 +9626,11 @@ TEST_CASE("Lredwood/correctness/btree") {
printf("setExistingKeyProbability: %f\n", setExistingKeyProbability);
printf("clearProbability: %f\n", clearProbability);
printf("clearExistingBoundaryProbability: %f\n", clearExistingBoundaryProbability);
printf("clearKnownNodeBoundaryProbability: %f\n", clearKnownNodeBoundaryProbability);
printf("clearSingleKeyProbability: %f\n", clearSingleKeyProbability);
printf("clearPostSetProbability: %f\n", clearPostSetProbability);
printf("coldStartProbability: %f\n", coldStartProbability);
printf("maxColdStarts: %d\n", maxColdStarts);
printf("advanceOldVersionProbability: %f\n", advanceOldVersionProbability);
printf("pageCacheBytes: %s\n", pageCacheBytes == 0 ? "default" : format("%" PRId64, pageCacheBytes).c_str());
printf("versionIncrement: %" PRId64 "\n", versionIncrement);
@ -9534,9 +9646,11 @@ TEST_CASE("Lredwood/correctness/btree") {
state VersionedBTree* btree = new VersionedBTree(pager, file);
wait(btree->init());
state DecodeBoundaryVerifier* pBoundaries = DecodeBoundaryVerifier::getVerifier(file);
state std::map<std::pair<std::string, Version>, Optional<std::string>> written;
state int64_t totalRecordsRead = 0;
state std::set<Key> keys;
state int coldStarts = 0;
state Version lastVer = btree->getLastCommittedVersion();
printf("Starting from version: %" PRId64 "\n", lastVer);
@ -9595,6 +9709,21 @@ TEST_CASE("Lredwood/correctness/btree") {
end = *i;
}
if (!pBoundaries->boundarySamples.empty() &&
deterministicRandom()->random01() < clearKnownNodeBoundaryProbability) {
start = pBoundaries->getSample();
// Can't allow the end boundary to be a start, so just convert to empty string.
if (start == VersionedBTree::dbEnd.key) {
start = Key();
}
}
if (!pBoundaries->boundarySamples.empty() &&
deterministicRandom()->random01() < clearKnownNodeBoundaryProbability) {
end = pBoundaries->getSample();
}
// Do a single key clear based on probability or end being randomly chosen to be the same as begin
// (unlikely)
if (deterministicRandom()->random01() < clearSingleKeyProbability || end == start) {
@ -9730,7 +9859,9 @@ TEST_CASE("Lredwood/correctness/btree") {
mutationBytesTargetThisCommit = randomSize(maxCommitSize);
// Recover from disk at random
if (!pagerMemoryOnly && deterministicRandom()->random01() < coldStartProbability) {
if (!pagerMemoryOnly && coldStarts < maxColdStarts &&
deterministicRandom()->random01() < coldStartProbability) {
++coldStarts;
printf("Recovering from disk after next commit.\n");
// Wait for outstanding commit
@ -10239,7 +10370,7 @@ TEST_CASE(":/redwood/performance/set") {
state Future<Void> stats =
traceMetrics ? Void()
: repeatEvery(1.0, [&]() { printf("Stats:\n%s\n", g_redwoodMetrics.toString(true).c_str()); });
: recurring([&]() { printf("Stats:\n%s\n", g_redwoodMetrics.toString(true).c_str()); }, 1.0);
if (scans > 0) {
printf("Parallel scans, concurrency=%d, scans=%d, scanWidth=%d, scanPreftchBytes=%d ...\n",

View File

@ -294,6 +294,13 @@ private:
};
UID getSharedMemoryMachineId() {
// new UID to use if an existing one is not found
UID newUID = deterministicRandom()->randomUniqueID();
#if DEBUG_DETERMINISM
// Don't use shared memory if DEBUG_DETERMINISM is set
return newUID;
#else
UID* machineId = nullptr;
int numTries = 0;
@ -306,7 +313,7 @@ UID getSharedMemoryMachineId() {
// "0" is the default parameter "addr"
boost::interprocess::managed_shared_memory segment(
boost::interprocess::open_or_create, sharedMemoryIdentifier.c_str(), 1000, 0, p.permission);
machineId = segment.find_or_construct<UID>("machineId")(deterministicRandom()->randomUniqueID());
machineId = segment.find_or_construct<UID>("machineId")(newUID);
if (!machineId)
criticalError(
FDB_EXIT_ERROR, "SharedMemoryError", "Could not locate or create shared memory - 'machineId'");
@ -330,6 +337,7 @@ UID getSharedMemoryMachineId() {
}
}
}
#endif
}
ACTOR void failAfter(Future<Void> trigger, ISimulator::ProcessInfo* m = g_simulator.getCurrentProcess()) {

View File

@ -638,6 +638,9 @@ public:
return tokens;
}
// True if both StringRefs reference exactly the same memory
bool same(const StringRef& s) const { return data == s.data && length == s.length; }
private:
// Unimplemented; blocks conversion through std::string
StringRef(char*);

View File

@ -275,7 +275,7 @@ double
timer(); // Returns the system real time clock with high precision. May jump around when system time is adjusted!
double timer_monotonic(); // Returns a high precision monotonic clock which is adjusted to be kind of similar to timer()
// at startup, but might not be a globally accurate time.
uint64_t timer_int(); // Return timer as uint64_t
uint64_t timer_int(); // Return timer as uint64_t representing epoch nanoseconds
void getLocalTime(const time_t* timep, struct tm* result);

View File

@ -221,6 +221,7 @@ Future<T> delayed(Future<T> what, double time = 0.0, TaskPriority taskID = TaskP
}
}
// wait <interval> then call what() in a loop forever
ACTOR template <class Func>
Future<Void> recurring(Func what, double interval, TaskPriority taskID = TaskPriority::DefaultDelay) {
loop choose {
@ -2048,15 +2049,6 @@ private:
Reference<UnsafeWeakFutureReferenceData> data;
};
// Call a lambda every <interval> seconds
ACTOR template <typename Fn>
Future<Void> repeatEvery(double interval, Fn fn) {
loop {
wait(delay(interval));
fn();
}
}
#include "flow/unactorcompiler.h"
#endif

View File

@ -571,6 +571,10 @@ public:
// A wrapper for directly getting the system time. The time returned by now() only updates in the run loop,
// so it cannot be used to measure times of functions that do not have wait statements.
// Simulation version of timer_int for convenience, based on timer()
// Returns epoch nanoseconds
uint64_t timer_int() { return (uint64_t)(g_network->timer() * 1e9); }
virtual double timer_monotonic() = 0;
// Similar to timer, but monotonic