Lots of bug fixes around page reads and concurrency.

This commit is contained in:
Stephen Atherton 2018-07-03 15:39:32 -07:00
parent b95a2bd6c1
commit 09e68a4335
6 changed files with 84 additions and 42 deletions

View File

@ -30,29 +30,30 @@ struct SumType {
std::string toString() { return format("0x%08x%08x", part1, part2); }
};
void checksum(std::string const &file, Reference<const IPage> page, LogicalPageID logical, PhysicalPageID physical, bool write) {
void checksum(std::string const &file, uint8_t *page, int pageSize, LogicalPageID logical, PhysicalPageID physical, bool write) {
// Calculates and then stores or verifies the checksum at the end of the page.
// If write is true then the checksum is written into the page
// If write is false then the checksum is compared to the in-page sum and
// and error will be thrown if they do not match.
uint8_t *pData = (uint8_t *)page->begin();
ASSERT(sizeof(SumType) == IndirectShadowPage::PAGE_OVERHEAD_BYTES);
// Adjust pageSize to refer to only usable storage bytes
pageSize -= IndirectShadowPage::PAGE_OVERHEAD_BYTES;
SumType sum;
SumType *pSumInPage = (SumType *)(pData + page->size());
SumType *pSumInPage = (SumType *)(page + pageSize);
// Write sum directly to page or to sum variable based on mode
SumType *sumOut = write ? pSumInPage : &sum;
sumOut->part1 = physical;
sumOut->part2 = logical;
hashlittle2(pData, page->size(), &sumOut->part1, &sumOut->part2);
hashlittle2(page, pageSize, &sumOut->part1, &sumOut->part2);
debug_printf("checksum %s logical %d physical %d checksums page %s calculated %s\n", write ? "write" : "read", logical, physical, write ? "NA" : pSumInPage->toString().c_str(), sumOut->toString().c_str());
debug_printf("checksum %s logical %d physical %d size %d checksums page %s calculated %s data at %p %s\n", write ? "write" : "read", logical, physical, pageSize, write ? "NA" : pSumInPage->toString().c_str(), sumOut->toString().c_str(), page, StringRef(page, pageSize).toHexString().c_str());
// Verify if not in write mode
if(!write && sum != *pSumInPage) {
auto e = checksum_failed();
TraceEvent (SevError, "IndirectShadowPagerPageChecksumFailure")
.detail("UserPageSize", page->size())
.detail("UserPageSize", pageSize)
.detail("Filename", file.c_str())
.detail("LogicalPage", logical)
.detail("PhysicalPage", physical)
@ -63,7 +64,7 @@ void checksum(std::string const &file, Reference<const IPage> page, LogicalPageI
}
}
IndirectShadowPage::IndirectShadowPage() : allocated(true) {
IndirectShadowPage::IndirectShadowPage() : fastAllocated(true) {
data = (uint8_t*)FastAllocator<4096>::allocate();
#if VALGRIND
// Prevent valgrind errors caused by writing random unneeded bytes to disk.
@ -71,12 +72,13 @@ IndirectShadowPage::IndirectShadowPage() : allocated(true) {
#endif
}
IndirectShadowPage::IndirectShadowPage(uint8_t *data) : data(data), allocated(false) {}
IndirectShadowPage::~IndirectShadowPage() {
if(allocated) {
if(fastAllocated) {
FastAllocator<4096>::release(data);
}
else if(file) {
file->releaseZeroCopy(data, PAGE_BYTES, physicalPageID * PAGE_BYTES);
}
}
uint8_t const* IndirectShadowPage::begin() const {
@ -416,9 +418,9 @@ ACTOR Future<Void> waitAndFreePhysicalPageID(IndirectShadowPager *pager, Physica
// Otherwise, the page could be rewritten prematurely.
void IndirectShadowPager::freePhysicalPageID(PhysicalPageID pageID) {
debug_printf("%s: Freeing physical %u\n", pageFileName.c_str(), pageID);
auto itr = readCounts.find(pageID);
if(itr != readCounts.end()) {
operations.add(waitAndFreePhysicalPageID(this, pageID, itr->second.second.getFuture()));
auto itr = busyPages.find(pageID);
if(itr != busyPages.end()) {
operations.add(waitAndFreePhysicalPageID(this, pageID, itr->second.onUnused.getFuture()));
}
else {
pagerFile.freePage(pageID);
@ -458,7 +460,7 @@ void IndirectShadowPager::writePage(LogicalPageID pageID, Reference<IPage> conte
logPageTableUpdate(pageID, updateVersion, physicalPageID);
checksum(basename, contents, pageID, physicalPageID, true);
checksum(basename, contents->mutate(), IndirectShadowPage::PAGE_BYTES, pageID, physicalPageID, true);
Future<Void> write = holdWhile(contents, dataFile->write(contents->begin(), IndirectShadowPage::PAGE_BYTES, physicalPageID * IndirectShadowPage::PAGE_BYTES));
if(write.isError()) {
@ -566,9 +568,9 @@ void IndirectShadowPager::close() {
shutdown(this, false);
}
ACTOR Future<Reference<const IPage>> getPageImpl(IndirectShadowPager *pager, Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID pageID, Version version) {
ASSERT(pageID < pager->pageTable.size());
PageVersionMap &pageVersionMap = pager->pageTable[pageID];
ACTOR Future<Reference<const IPage>> getPageImpl(IndirectShadowPager *pager, Reference<IndirectShadowPagerSnapshot> snapshot, LogicalPageID logicalPageID, Version version) {
ASSERT(logicalPageID < pager->pageTable.size());
PageVersionMap &pageVersionMap = pager->pageTable[logicalPageID];
auto itr = IndirectShadowPager::pageVersionMapUpperBound(pageVersionMap, version);
if(itr == pageVersionMap.begin()) {
@ -578,32 +580,50 @@ ACTOR Future<Reference<const IPage>> getPageImpl(IndirectShadowPager *pager, Ref
--itr;
state PhysicalPageID physicalPageID = itr->second;
debug_printf("%s: Reading logical %d v%lld physical %d mapSize %lu\n", pager->pageFileName.c_str(), pageID, version, physicalPageID, pageVersionMap.size());
debug_printf("%s: Reading logical %d v%lld physical %d mapSize %lu\n", pager->pageFileName.c_str(), logicalPageID, version, physicalPageID, pageVersionMap.size());
ASSERT(physicalPageID != PagerFile::INVALID_PAGE);
++pager->readCounts[physicalPageID].first;
state IndirectShadowPager::BusyPageMapT::iterator i = pager->busyPages.find(physicalPageID);
// Page not in use yet
if(i == pager->busyPages.end() || i->first != physicalPageID) {
i = pager->busyPages.insert(i, {physicalPageID, IndirectShadowPager::BusyPage()});
}
//IndirectShadowPager::BusyPage &bp = pager->busyPages[physicalPageID];
IndirectShadowPager::BusyPage &bp = i->second;
++bp.readerCount;
// We are relying on the use of AsyncFileCached for performance here. We expect that all write actors will complete immediately (with a possible yield()),
// so this wait should either be nonexistent or just a yield.
Void _ = wait(pager->writeActors.signalAndCollapse());
// This causes a crash due to lifetime issues, and isn't necessary when using AsyncFileCached.
//Void _ = wait(pager->writeActors.signalAndCollapse());
//IndirectShadowPager::BusyPage &bp = i->second;
state uint8_t *buf = new (snapshot->arena) uint8_t[IndirectShadowPage::PAGE_BYTES];
// TODO: Use readZeroCopy but fall back ton read(). Releasing pages should releaseZeroCopy for successful zero copy reads
int read = wait(pager->dataFile->read(buf, IndirectShadowPage::PAGE_BYTES, physicalPageID * IndirectShadowPage::PAGE_BYTES));
ASSERT(read == IndirectShadowPage::PAGE_BYTES);
auto readCountItr = pager->readCounts.find(physicalPageID);
ASSERT(readCountItr != pager->readCounts.end());
if(readCountItr->second.first == 1) {
readCountItr->second.second.send(Void());
pager->readCounts.erase(readCountItr);
}
else {
--readCountItr->second.first;
if(bp.readerCount == 1) {
ASSERT(!bp.read.isValid());
debug_printf("PAGE %d: starting read\n", physicalPageID);
state int len = IndirectShadowPage::PAGE_BYTES;
state void *buf = nullptr;
bp.read = map(pager->dataFile->readZeroCopy(&buf, &len, (int64_t) physicalPageID * IndirectShadowPage::PAGE_BYTES),
[=](Void) {
ASSERT(len == IndirectShadowPage::PAGE_BYTES);
checksum(pager->basename, (uint8_t *)buf, len, logicalPageID, physicalPageID, false);
return Reference<IPage>(new IndirectShadowPage((uint8_t *)buf, pager->dataFile, physicalPageID));
});
}
Reference<IPage> p = wait(bp.read);
IndirectShadowPager::BusyPage &bp = i->second;
debug_printf("PAGE %d: read. readerCount %d\n", physicalPageID, bp.readerCount);
--bp.readerCount;
if(bp.readerCount == 0) {
Promise<Void> pUnused = bp.onUnused;
pager->busyPages.erase(i);
pUnused.send(Void());
}
Reference<const IPage> p(new IndirectShadowPage(buf));
checksum(pager->basename, p, pageID, physicalPageID, false);
return p;
}
@ -700,7 +720,7 @@ ACTOR Future<Void> copyPage(IndirectShadowPager *pager, Reference<IPage> page, L
}
ASSERT(bytes == IndirectShadowPage::PAGE_BYTES);
checksum(pager->basename, page, logical, to, true);
checksum(pager->basename, page->mutate(), bytes, logical, to, true);
Void _ = wait(pager->dataFile->write(data, bytes, to * IndirectShadowPage::PAGE_BYTES));
if(zeroCopied) {
pager->dataFile->releaseZeroCopy(data, bytes, from * IndirectShadowPage::PAGE_BYTES);

View File

@ -39,7 +39,8 @@ class IndirectShadowPager;
class IndirectShadowPage : public IPage, ReferenceCounted<IndirectShadowPage> {
public:
IndirectShadowPage();
IndirectShadowPage(uint8_t *data);
IndirectShadowPage(uint8_t *data, Reference<IAsyncFile> file, PhysicalPageID pageID)
: file(file), physicalPageID(pageID), fastAllocated(false), data(data) {}
virtual ~IndirectShadowPage();
virtual void addref() const {
@ -59,8 +60,10 @@ public:
static const int PAGE_OVERHEAD_BYTES;
private:
Reference<IAsyncFile> file;
PhysicalPageID physicalPageID;
bool fastAllocated;
uint8_t *data;
bool allocated;
};
class IndirectShadowPagerSnapshot : public IPagerSnapshot, ReferenceCounted<IndirectShadowPagerSnapshot> {
@ -131,6 +134,8 @@ public:
class IndirectShadowPager : public IPager {
public:
IndirectShadowPager(std::string basename);
virtual ~IndirectShadowPager() {
}
virtual Reference<IPage> newPageBuffer();
virtual int getUsablePageSize();
@ -172,7 +177,16 @@ public:
Future<Void> vacuuming;
Version oldestVersion;
std::map<PhysicalPageID, std::pair<int, Promise<Void>>> readCounts;
struct BusyPage {
BusyPage() : readerCount(0) {}
int readerCount;
Promise<Void> onUnused;
Future<Reference<IPage>> read;
};
typedef std::map<PhysicalPageID, BusyPage> BusyPageMapT;
BusyPageMapT busyPages;
SignalableActorCollection operations;
SignalableActorCollection writeActors;
Future<Void> committing;

View File

@ -437,7 +437,6 @@ public:
// and constructing PathEntry objects which would unnecessarily churn through memory in Arena for storing
// coalesced prefixes.
int pathLen;
Arena arena;
bool valid() const {
return pathLen != 0 && pathBack().valid();

View File

@ -1469,6 +1469,8 @@ Future<T> catchError(Promise<Void> error, Future<T> f) {
class KeyValueStoreRedwoodUnversioned : public IKeyValueStore {
public:
KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID) : m_filePrefix(filePrefix) {
// TODO: These implementation-specific things should really be passed in as arguments, and this class should
// be an IKeyValueStore implementation that wraps IVersionedStore.
m_pager = new IndirectShadowPager(filePrefix);
m_tree = new VersionedBTree(m_pager, filePrefix, m_pager->getUsablePageSize());
m_init = catchError(m_error, init_impl(this));
@ -1486,6 +1488,7 @@ public:
}
ACTOR void shutdown(KeyValueStoreRedwoodUnversioned *self, bool dispose) {
TraceEvent(SevInfo, "RedwoodShutdown").detail("FilePrefix", self->m_filePrefix).detail("Dispose", dispose);
self->m_init.cancel();
delete self->m_tree;
Future<Void> closedFuture = self->m_pager->onClosed();
@ -1495,6 +1498,7 @@ public:
self->m_pager->close();
Void _ = wait(closedFuture);
self->m_closed.send(Void());
TraceEvent(SevInfo, "RedwoodShutdownComplete").detail("FilePrefix", self->m_filePrefix).detail("Dispose", dispose);
delete self;
}

View File

@ -3291,6 +3291,9 @@ ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerI
self.folder = folder;
try {
Void _ = wait( self.storage.init() );
Void _ = wait( self.storage.commit() );
if (seedTag == invalidTag) {
std::pair<Version, Tag> verAndTag = wait( addStorageServer(self.cx, ssi) ); // Might throw recruitment_failed in case of simultaneous master failure
self.tag = verAndTag.second;

View File

@ -221,7 +221,7 @@ struct KVStoreTestWorkload : TestWorkload {
doCount = getOption( options, LiteralStringRef("count"), false );
filename = getOption( options, LiteralStringRef("filename"), Value() ).toString();
saturation = getOption( options, LiteralStringRef("saturation"), false );
storeType = getOption( options, LiteralStringRef("storeType"), LiteralStringRef("ssd") ).toString();
storeType = getOption( options, LiteralStringRef("storeType"), LiteralStringRef("ssd-redwood-1") ).toString();
}
virtual std::string description() { return "KVStoreTest"; }
virtual Future<Void> setup( Database const& cx ) { return Void(); }
@ -386,6 +386,8 @@ ACTOR Future<Void> testKVStore(KVStoreTestWorkload* workload) {
else
ASSERT(false);
Void _ = wait(test.store->init());
state Future<Void> main = testKVStoreMain( workload, &test );
try {
choose {