Bug fixes. COWPager's page cache was being initialized too late in recovery, after it had already been used. Cursor's KeyValueRef memory sometimes pointed to freed memory from an InternalCursor that had been moved. Added valgrind macros to avoid false positives.
This commit is contained in:
parent
30c56536bd
commit
fa357ef1ca
|
@ -340,6 +340,8 @@ public:
|
|||
|
||||
void writePage() {
|
||||
debug_printf("FIFOQueue(%s): write page id=%u\n", queue->name.c_str(), pageID);
|
||||
VALGRIND_MAKE_MEM_DEFINED(raw()->begin(), offset);
|
||||
VALGRIND_MAKE_MEM_DEFINED(raw()->begin() + offset, queue->dataBytesPerPage - raw()->endOffset);
|
||||
queue->pager->updatePage(pageID, page);
|
||||
}
|
||||
|
||||
|
@ -351,13 +353,13 @@ public:
|
|||
|
||||
Future<Void> writeTail(const T &item) {
|
||||
ASSERT(loading.isReady());
|
||||
debug_printf("FIFOQueue(%s): writeTail(%s) to %u:%d\n", queue->name.c_str(), toString(item).c_str(), pageID, offset);
|
||||
auto p = raw();
|
||||
int bytesNeeded = Codec::bytesNeeded(item);
|
||||
if(offset + bytesNeeded > queue->dataBytesPerPage) {
|
||||
newTailPage();
|
||||
return waitThenWriteTail(this, item);
|
||||
}
|
||||
debug_printf("FIFOQueue(%s): writeTail(%s) to %u:%d\n", queue->name.c_str(), toString(item).c_str(), pageID, offset);
|
||||
Codec::writeToBytes(p->begin() + offset, item);
|
||||
++queue->numEntries;
|
||||
offset += bytesNeeded;
|
||||
|
@ -373,7 +375,6 @@ public:
|
|||
|
||||
Future<Void> writeHead(const T &item) {
|
||||
ASSERT(loading.isReady());
|
||||
debug_printf("FIFOQueue(%s): writeHead(%s) to %u:%d\n", queue->name.c_str(), toString(item).c_str(), pageID, offset);
|
||||
int bytesNeeded = Codec::bytesNeeded(item);
|
||||
if(offset < bytesNeeded) {
|
||||
newHeadPage();
|
||||
|
@ -381,6 +382,7 @@ public:
|
|||
}
|
||||
offset -= bytesNeeded;
|
||||
auto p = raw();
|
||||
debug_printf("FIFOQueue(%s): writeHead(%s) to %u:%d\n", queue->name.c_str(), toString(item).c_str(), pageID, offset);
|
||||
Codec::writeToBytes(p->begin() + offset, item);
|
||||
++queue->numEntries;
|
||||
return Void();
|
||||
|
@ -771,6 +773,11 @@ public:
|
|||
cache.clear();
|
||||
}
|
||||
|
||||
int count() const {
|
||||
ASSERT(evictionOrder.size() == cache.size());
|
||||
return evictionOrder.size();
|
||||
}
|
||||
|
||||
private:
|
||||
struct Entry : public boost::intrusive::list_base_hook<> {
|
||||
IndexType index;
|
||||
|
@ -778,10 +785,10 @@ private:
|
|||
};
|
||||
|
||||
int sizeLimit;
|
||||
boost::intrusive::list<Entry> evictionOrder;
|
||||
|
||||
// TODO: Use boost intrusive unordered set instead, with a comparator that only considers entry.index
|
||||
std::unordered_map<IndexType, Entry> cache;
|
||||
boost::intrusive::list<Entry> evictionOrder;
|
||||
};
|
||||
|
||||
ACTOR template<class T> Future<T> forwardError(Future<T> f, Promise<Void> target) {
|
||||
|
@ -837,6 +844,8 @@ public:
|
|||
if(pHeader != nullptr) {
|
||||
pHeader->pageSize = logicalPageSize;
|
||||
}
|
||||
ASSERT(pageCache.count() == 0);
|
||||
pageCache = PageCacheT(pageCacheBytes / physicalPageSize);
|
||||
}
|
||||
|
||||
void updateCommittedHeader() {
|
||||
|
@ -972,8 +981,6 @@ public:
|
|||
wait(self->commit());
|
||||
}
|
||||
|
||||
self->pageCache = PageCacheT(self->pageCacheBytes / self->physicalPageSize);
|
||||
|
||||
debug_printf("COWPager(%s) recovered. committedVersion=%" PRId64 " logicalPageSize=%d physicalPageSize=%d\n", self->filename.c_str(), self->pHeader->committedVersion, self->logicalPageSize, self->physicalPageSize);
|
||||
return Void();
|
||||
}
|
||||
|
@ -3059,7 +3066,7 @@ private:
|
|||
ASSERT(blockCount > 1);
|
||||
int size = blockSize * blockCount;
|
||||
btPage = (BTreePage *)new uint8_t[size];
|
||||
VALGRIND_MAKE_MEM_DEFINED(btPageMem, size);
|
||||
VALGRIND_MAKE_MEM_DEFINED(btPage, size);
|
||||
}
|
||||
|
||||
btPage->formatVersion = BTreePage::FORMAT_VERSION;
|
||||
|
@ -3155,7 +3162,7 @@ private:
|
|||
while(records.size() > 1) {
|
||||
self->m_header.height = ++height;
|
||||
Standalone<VectorRef<RedwoodRecordRef>> newRecords = wait(writePages(self, false, &dbBegin, &dbEnd, records, 0, height, version, BTreePageID()));
|
||||
debug_printf_always("Wrote a new root level at version %" PRId64 " height %d size %lu pages\n", version, height, newRecords.size());
|
||||
debug_printf("Wrote a new root level at version %" PRId64 " height %d size %lu pages\n", version, height, newRecords.size());
|
||||
records = newRecords;
|
||||
}
|
||||
|
||||
|
@ -3242,7 +3249,7 @@ private:
|
|||
debug_printf("readPage() %s\n", pTreePage->toString(false, id, snapshot->getVersion(), lowerBound, upperBound).c_str());
|
||||
|
||||
// Nothing should attempt to read bytes in the page outside the BTreePage structure
|
||||
VALGRIND_MAKE_MEM_UNDEFINED(result->begin() + pTreePage->size(), result->size() - pTreePage->size());
|
||||
VALGRIND_MAKE_MEM_UNDEFINED(page->begin() + pTreePage->size(), page->size() - pTreePage->size());
|
||||
|
||||
return page;
|
||||
}
|
||||
|
@ -4195,6 +4202,7 @@ private:
|
|||
self->m_arena = Arena();
|
||||
const RedwoodRecordRef &rec = self->m_cur1.get();
|
||||
|
||||
self->m_kv.reset();
|
||||
debug_printf("readFullKVPair: Starting at %s\n", self->toString().c_str());
|
||||
|
||||
// Unsplit value, cur1 will hold the key and value memory
|
||||
|
|
Loading…
Reference in New Issue