Implement checksum via LRU-like approach to save space (#11194)

This commit is contained in:
Hao Fu 2024-02-21 12:24:51 +08:00 committed by GitHub
parent e50cef8e8c
commit 8555ac9b71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 598 additions and 173 deletions

View File

@ -18,7 +18,200 @@
* limitations under the License.
*/
#include "fdbrpc/AsyncFileWriteChecker.h"
#include "fdbrpc/AsyncFileWriteChecker.actor.h"
#include "flow/UnitTest.h"
Optional<int> AsyncFileWriteChecker::checksumHistoryBudget = {};
int AsyncFileWriteChecker::checksumHistoryPageSize = 4096;
static void compareWriteInfo(AsyncFileWriteChecker::WriteInfo w1, AsyncFileWriteChecker::WriteInfo w2) {
ASSERT(w1.timestamp == w2.timestamp);
ASSERT(w1.checksum == w2.checksum);
}
class LRU2 {
private:
struct node {
uint32_t page;
AsyncFileWriteChecker::WriteInfo writeInfo;
node *next, *prev;
node(uint32_t _page, AsyncFileWriteChecker::WriteInfo _writeInfo) {
page = _page;
writeInfo = _writeInfo;
next = nullptr;
prev = NULL;
}
};
node* start;
node* end;
std::unordered_map<uint32_t, node*> m;
std::string fileName;
int maxFullPagePlusOne;
void insertHead(node* n) {
n->next = start->next;
start->next->prev = n;
n->prev = start;
start->next = n;
}
void removeFromList(node* n) {
n->prev->next = n->next;
n->next->prev = n->prev;
}
public:
LRU2(std::string _fileName) {
fileName = _fileName;
maxFullPagePlusOne = 0;
start = new node(0, AsyncFileWriteChecker::WriteInfo());
end = new node(0, AsyncFileWriteChecker::WriteInfo());
start->next = end;
end->prev = start;
}
void update(uint32_t page, AsyncFileWriteChecker::WriteInfo writeInfo) {
if (m.find(page) != m.end()) {
node* n = m[page];
removeFromList(n);
insertHead(n);
n->writeInfo = writeInfo;
return;
}
node* n = new node(page, writeInfo);
insertHead(n);
m[page] = n;
if (page >= maxFullPagePlusOne) {
maxFullPagePlusOne = page + 1;
}
}
uint32_t randomPage() {
if (m.size() == 0) {
return -1;
}
auto it = m.begin();
std::advance(it, deterministicRandom()->randomInt(0, (int)m.size()));
return it->first;
}
void truncate(int newMaxFullPagePlusOne) {
// exclude newMaxFullPage
for (int i = newMaxFullPagePlusOne; i < maxFullPagePlusOne; i++) {
remove(i);
}
if (maxFullPagePlusOne > newMaxFullPagePlusOne) {
maxFullPagePlusOne = newMaxFullPagePlusOne;
}
}
void print() {
auto it = end->prev;
while (it != start) {
printf("%d\t", it->page);
it = it->prev;
}
printf("\n");
}
int size() { return m.size(); }
bool exist(uint32_t page) { return m.find(page) != m.end(); }
AsyncFileWriteChecker::WriteInfo find(uint32_t page) {
auto it = m.find(page);
if (it == m.end()) {
TraceEvent(SevError, "LRU2CheckerTryFindingPageNotExist")
.detail("FileName", fileName)
.detail("Page", page)
.log();
return AsyncFileWriteChecker::WriteInfo();
}
return it->second->writeInfo;
}
uint32_t leastRecentlyUsedPage() {
if (m.size() == 0) {
return -1;
}
return end->prev->page;
}
void remove(uint32_t page) {
if (m.find(page) == m.end()) {
return;
}
node* n = m[page];
removeFromList(n);
m.erase(page);
delete n;
}
};
TEST_CASE("/fdbrpc/AsyncFileWriteChecker/LRU") {
// run 1000 runs, each run either add or remote an element
// record the latest add / remove operation for each key
// try to find the elements that exist/removed, they should be present/absent
int i = 0;
int run = 1000;
// limit is a small number so that page can have conflict
// also LRU2::truncate has a time complexity of O(size of file), so page cannot be too large
int limit = 1000;
AsyncFileWriteChecker::LRU lru("TestLRU");
LRU2 lru2("TestLRU");
while (i < run) {
double r = deterministicRandom()->random01();
if (lru2.size() == 0 || r > 0.5) {
// to add/update
uint32_t page = deterministicRandom()->randomInt(0, limit);
if (lru2.exist(page)) {
// the page already exist
compareWriteInfo(lru.find(page), lru2.find(page));
}
// change the content each time
AsyncFileWriteChecker::WriteInfo wi;
wi.checksum = deterministicRandom()->randomInt(0, INT_MAX);
wi.timestamp = AsyncFileWriteChecker::transformTime(now());
lru.update(page, wi);
lru2.update(page, wi);
compareWriteInfo(lru.find(page), lru2.find(page));
// printf("ASYNC::Insert %d\n", page);
} else if (r < 0.45) {
// to remove
uint32_t page = lru2.randomPage();
ASSERT(page != -1);
ASSERT(lru.exist(page));
ASSERT(lru2.exist(page));
compareWriteInfo(lru.find(page), lru2.find(page));
lru.remove(page);
lru2.remove(page);
ASSERT(!lru.exist(page));
ASSERT(!lru2.exist(page));
// printf("ASYNC::erase %d\n", page);
} else {
// to truncate
uint32_t page = lru2.randomPage();
uint32_t page2 = lru2.randomPage(); // to ensure
lru.truncate(page);
lru2.truncate(page);
if (page2 >= page) {
ASSERT(!lru.exist(page2));
ASSERT(!lru2.exist(page2));
}
// printf("ASYNC::truncate %d\n", page);
}
// lru2.print();
if (lru2.size() != 0) {
uint32_t leastRecentlyPage = lru.leastRecentlyUsedPage();
uint32_t leastRecentlyPage2 = lru2.leastRecentlyUsedPage();
ASSERT(leastRecentlyPage == leastRecentlyPage2);
compareWriteInfo(lru.find(leastRecentlyPage), lru2.find(leastRecentlyPage));
}
// printf("Found Page %d, leastRecentlyPage is %d, step is %d\n", page, leastRecentlyPage, it->first);
i += 1;
}
return Void();
}

View File

@ -43,7 +43,7 @@
#include "fdbrpc/AsyncFileKAIO.actor.h"
#include "flow/AsioReactor.h"
#include "flow/Platform.h"
#include "fdbrpc/AsyncFileWriteChecker.h"
#include "fdbrpc/AsyncFileWriteChecker.actor.h"
// Opens a file for asynchronous I/O
Future<Reference<class IAsyncFile>> Net2FileSystem::open(const std::string& filename, int64_t flags, int64_t mode) {

View File

@ -0,0 +1,347 @@
/*
* AsyncFileWriteChecker.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if defined(NO_INTELLISENSE) && !defined(ASYNC_FILE_WRITE_CHECKER_ACTOR_G_H)
#define ASYNC_FILE_WRITE_CHECKER_ACTOR_G_H
#include "fdbrpc/AsyncFileWriteChecker.actor.g.h"
#elif !defined(ASYNC_FILE_WRITE_CHECKER_ACTOR_H)
#define ASYNC_FILE_WRITE_CHECKER_ACTOR_H
#include "flow/IAsyncFile.h"
#include "crc32/crc32c.h"
#if VALGRIND
#include <memcheck.h>
#endif
#include "flow/actorcompiler.h"
static double millisecondsPerSecond = 1000;
// this class does checksum for the wrapped IAsyncFile in read and writes opertions.
// it maintains a dynamic data structure to store the recently written page and its checksum.
// it has an actor to continuously read and verify checksums for the recently written page,
// and also deletes the corresponding entry upon a successful to avoid using too much memory.
class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFileWriteChecker> {
public:
void addref() override { ReferenceCounted<AsyncFileWriteChecker>::addref(); }
void delref() override { ReferenceCounted<AsyncFileWriteChecker>::delref(); }
virtual StringRef getClassName() override { return "AsyncFileWriteChecker"_sr; }
// For read() and write(), the data buffer must remain valid until the future is ready
Future<int> read(void* data, int length, int64_t offset) override {
// Lambda must hold a reference to this to keep it alive until after the read
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->read(data, length, offset), [self, data, offset](int r) {
self->updateChecksumHistory(false, offset, r, (uint8_t*)data);
return r;
});
}
Future<Void> readZeroCopy(void** data, int* length, int64_t offset) override {
// Lambda must hold a reference to this to keep it alive until after the read
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->readZeroCopy(data, length, offset), [self, data, length, offset](Void r) {
self->updateChecksumHistory(false, offset, *length, (uint8_t*)data);
return r;
});
}
Future<Void> write(void const* data, int length, int64_t offset) override {
auto pages = updateChecksumHistory(true, offset, length, (uint8_t*)data);
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->write(data, length, offset), [self, pages](Void r) {
for (uint32_t page : pages) {
self->writing.erase(page);
}
return r;
});
}
Future<Void> truncate(int64_t size) override {
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->truncate(size), [self, size](Void r) {
int maxFullPage = size / checksumHistoryPageSize;
int oldSize = self->lru.size();
self->lru.truncate(maxFullPage);
checksumHistoryBudget.get() += oldSize - self->lru.size();
return r;
});
}
Future<Void> sync() override {
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->sync(), [self](Void r) {
self->syncedTime = AsyncFileWriteChecker::transformTime(now());
return r;
});
}
Future<Void> flush() override { return m_f->flush(); }
Future<int64_t> size() const override { return m_f->size(); }
std::string getFilename() const override { return m_f->getFilename(); }
void releaseZeroCopy(void* data, int length, int64_t offset) override {
return m_f->releaseZeroCopy(data, length, offset);
}
int64_t debugFD() const override { return m_f->debugFD(); }
struct WriteInfo {
WriteInfo() : checksum(0), timestamp(0) {}
uint32_t checksum;
uint32_t timestamp; // keep a precision of ms
};
static uint32_t transformTime(double unixTime) { return (uint32_t)(unixTime * millisecondsPerSecond); }
class LRU {
private:
int64_t step;
std::string fileName;
std::map<int, uint32_t> stepToKey;
std::map<uint32_t, int> keyToStep; // std::map is to support ::truncate
std::unordered_map<uint32_t, AsyncFileWriteChecker::WriteInfo> pageContents;
public:
LRU(std::string _fileName) {
step = 0;
fileName = _fileName;
}
void update(uint32_t page, AsyncFileWriteChecker::WriteInfo writeInfo) {
if (keyToStep.find(page) != keyToStep.end()) {
// remove its old entry in stepToKey
stepToKey.erase(keyToStep[page]);
}
keyToStep[page] = step;
stepToKey[step] = page;
pageContents[page] = writeInfo;
step++;
}
void truncate(uint32_t page) {
auto it = keyToStep.lower_bound(page);
// iterate through keyToStep, to find corresponding entries in stepToKey
while (it != keyToStep.end()) {
int step = it->second;
auto next = it;
next++;
keyToStep.erase(it);
stepToKey.erase(step);
it = next;
}
}
uint32_t randomPage() {
if (keyToStep.size() == 0) {
return -1;
}
auto it = keyToStep.begin();
std::advance(it, deterministicRandom()->randomInt(0, (int)keyToStep.size()));
return it->first;
}
int size() { return keyToStep.size(); }
bool exist(uint32_t page) { return keyToStep.find(page) != keyToStep.end(); }
AsyncFileWriteChecker::WriteInfo find(uint32_t page) {
auto it = keyToStep.find(page);
if (it == keyToStep.end()) {
TraceEvent(SevError, "LRUCheckerTryFindingPageNotExist")
.detail("FileName", fileName)
.detail("Page", page)
.log();
return AsyncFileWriteChecker::WriteInfo();
}
return pageContents[page];
}
uint32_t leastRecentlyUsedPage() {
if (stepToKey.size() == 0) {
return -1;
}
return stepToKey.begin()->second;
}
void remove(uint32_t page) {
if (keyToStep.find(page) == keyToStep.end()) {
return;
}
pageContents.erase(page);
stepToKey.erase(keyToStep[page]);
keyToStep.erase(page);
}
};
AsyncFileWriteChecker(Reference<IAsyncFile> f) : m_f(f), lru(f->getFilename()) {
// Initialize the static history budget the first time (and only the first time) a file is opened.
if (!checksumHistoryBudget.present()) {
checksumHistoryBudget = FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY;
}
pageBuffer = (void*)new char[checksumHistoryPageSize];
totalCheckedSucceed = 0;
totalCheckedFail = 0;
lru = LRU(m_f->getFilename());
checksumWorker = AsyncFileWriteChecker::sweep(this);
checksumLogger = runChecksumLogger(this);
}
~AsyncFileWriteChecker() override {
checksumHistoryBudget.get() += lru.size();
delete[] reinterpret_cast<char*>(pageBuffer);
}
private:
// transform from unixTime(double) to uint32_t, to retain ms precision.
Reference<IAsyncFile> m_f;
Future<Void> checksumWorker;
Future<Void> checksumLogger;
LRU lru;
void* pageBuffer;
uint64_t totalCheckedFail, totalCheckedSucceed;
uint32_t syncedTime;
// to avoid concurrent operation, so that the continuous reader will skip a page if it is being written
std::unordered_set<uint32_t> writing;
// This is the most page checksum history blocks we will use across all files.
static Optional<int> checksumHistoryBudget;
static int checksumHistoryPageSize;
ACTOR Future<Void> sweep(AsyncFileWriteChecker* self) {
loop {
// for each page, read and do checksum
// scan from the least recently used, thus it is safe to quit if data has not been synced
state uint32_t page = self->lru.leastRecentlyUsedPage();
while (self->writing.find(page) != self->writing.end() || page == -1) {
// avoid concurrent ops
wait(delay(FLOW_KNOBS->ASYNC_FILE_WRITE_CHEKCER_CHECKING_DELAY));
continue;
}
int64_t offset = page * checksumHistoryPageSize;
// perform a read to verify checksum, it will remove the entry upon success
wait(success(self->read(self->pageBuffer, checksumHistoryPageSize, offset)));
}
}
ACTOR Future<Void> runChecksumLogger(AsyncFileWriteChecker* self) {
state double delayDuration = FLOW_KNOBS->ASYNC_FILE_WRITE_CHEKCER_LOGGING_INTERVAL;
loop {
wait(delay(delayDuration));
// TODO: add more stats, such as total checked, current entries, budget
TraceEvent("AsyncFileWriteChecker")
.detail("Delay", delayDuration)
.detail("Filename", self->getFilename())
.detail("TotalCheckedSucceed", self->totalCheckedSucceed)
.detail("TotalCheckedFail", self->totalCheckedFail)
.detail("CurrentSize", self->lru.size());
}
}
// return true if there are still remaining valid synced pages to check, otherwise false
// this method removes the page entry from checksum history upon a successful check
bool verifyChecksum(int page, uint32_t checksum, uint8_t* start, bool sweep) {
if (!lru.exist(page)) {
// it has already been verified succesfully and removed by checksumWorker
return true;
}
WriteInfo history = lru.find(page);
// only verify checksum for pages have been synced
if (history.timestamp < syncedTime) {
if (history.checksum != checksum) {
TraceEvent(SevError, "AsyncFileLostWriteDetected")
.error(checksum_failed())
.detail("Filename", getFilename())
.detail("Sweep", sweep)
.detail("PageNumber", page)
.detail("Size", lru.size())
.detail("Start", (long)start)
.detail("ChecksumOfPage", checksum)
.detail("ChecksumHistory", history.checksum)
.detail("SyncedTime", syncedTime / millisecondsPerSecond)
.detail("LastWriteTime", history.timestamp / millisecondsPerSecond);
totalCheckedFail += 1;
} else {
checksumHistoryBudget.get() += 1;
lru.remove(page);
totalCheckedSucceed += 1;
}
return true;
} else {
return false;
}
}
// Update or check checksum(s) in history for any full pages covered by this operation
// return the updated pages when updateChecksum is true
std::vector<uint32_t> updateChecksumHistory(bool updateChecksum,
int64_t offset,
int len,
uint8_t* buf,
bool sweep = false) {
std::vector<uint32_t> pages;
// Check or set each full block in the the range
int page = offset / checksumHistoryPageSize; // First page number
int slack = offset % checksumHistoryPageSize; // Bytes after most recent page boundary
uint8_t* start = buf; // Position in buffer to start checking from
// If offset is not page-aligned, move to next page and adjust start
if (slack != 0) {
++page;
start += (checksumHistoryPageSize - slack);
}
int startPage = page;
int pageEnd = (offset + len) / checksumHistoryPageSize; // Last page plus 1
while (page < pageEnd) {
uint32_t checksum = crc32c_append(0xab12fd93, start, checksumHistoryPageSize);
#if VALGRIND
// It's possible we'll read or write a page where not all of the data is defined, but the checksum of the
// page is still valid
VALGRIND_MAKE_MEM_DEFINED_IF_ADDRESSABLE(&checksum, sizeof(uint32_t));
#endif
// when updateChecksum is true, just update the stored sum and skip checking
if (updateChecksum) {
writing.insert(page);
pages.push_back(page);
WriteInfo history;
if (!lru.exist(page)) {
if (checksumHistoryBudget.get() > 0) {
checksumHistoryBudget.get() -= 1;
} else {
TraceEvent("SkippedPagesDuringUpdateChecksum")
.detail("Filename", getFilename())
.detail("StartPage", startPage)
.detail("CheckedPage", page)
.detail("TotalPage", pageEnd);
break;
}
}
history.timestamp = AsyncFileWriteChecker::transformTime(now());
history.checksum = checksum;
lru.update(page, history);
} else {
if (!verifyChecksum(page, checksum, start, sweep)) {
break;
}
}
start += checksumHistoryPageSize;
++page;
}
return pages;
}
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -1,170 +0,0 @@
/*
* AsyncFileWriteChecker.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/IAsyncFile.h"
#include "crc32/crc32c.h"
#if VALGRIND
#include <memcheck.h>
#endif
class AsyncFileWriteChecker : public IAsyncFile, public ReferenceCounted<AsyncFileWriteChecker> {
public:
void addref() override { ReferenceCounted<AsyncFileWriteChecker>::addref(); }
void delref() override { ReferenceCounted<AsyncFileWriteChecker>::delref(); }
virtual StringRef getClassName() override { return "AsyncFileWriteChecker"_sr; }
// For read() and write(), the data buffer must remain valid until the future is ready
Future<int> read(void* data, int length, int64_t offset) override {
// Lambda must hold a reference to this to keep it alive until after the read
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->read(data, length, offset), [self, data, offset](int r) {
self->updateChecksumHistory(false, offset, r, (uint8_t*)data);
return r;
});
}
Future<Void> readZeroCopy(void** data, int* length, int64_t offset) override {
// Lambda must hold a reference to this to keep it alive until after the read
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->readZeroCopy(data, length, offset), [self, data, length, offset](Void r) {
self->updateChecksumHistory(false, offset, *length, (uint8_t*)data);
return r;
});
}
Future<Void> write(void const* data, int length, int64_t offset) override {
updateChecksumHistory(true, offset, length, (uint8_t*)data);
return m_f->write(data, length, offset);
}
Future<Void> truncate(int64_t size) override {
// Lambda must hold a reference to this to keep it alive until after the read
auto self = Reference<AsyncFileWriteChecker>::addRef(this);
return map(m_f->truncate(size), [self, size](Void r) {
// Truncate the page checksum history if it is in use
if ((size / checksumHistoryPageSize) < self->checksumHistory.size()) {
int oldCapacity = self->checksumHistory.capacity();
self->checksumHistory.resize(size / checksumHistoryPageSize);
checksumHistoryBudget.get() -= (self->checksumHistory.capacity() - oldCapacity);
}
return r;
});
}
Future<Void> sync() override { return m_f->sync(); }
Future<Void> flush() override { return m_f->flush(); }
Future<int64_t> size() const override { return m_f->size(); }
std::string getFilename() const override { return m_f->getFilename(); }
void releaseZeroCopy(void* data, int length, int64_t offset) override {
return m_f->releaseZeroCopy(data, length, offset);
}
int64_t debugFD() const override { return m_f->debugFD(); }
AsyncFileWriteChecker(Reference<IAsyncFile> f) : m_f(f) {
// Initialize the static history budget the first time (and only the first time) a file is opened.
if (!checksumHistoryBudget.present()) {
checksumHistoryBudget = FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY;
}
// Adjust the budget by the initial capacity of history, which should be 0 but maybe not for some
// implementations.
checksumHistoryBudget.get() -= checksumHistory.capacity();
}
~AsyncFileWriteChecker() override { checksumHistoryBudget.get() += checksumHistory.capacity(); }
private:
Reference<IAsyncFile> m_f;
struct WriteInfo {
WriteInfo() : checksum(0), timestamp(0) {}
uint32_t checksum;
uint32_t timestamp;
};
std::vector<WriteInfo> checksumHistory;
// This is the most page checksum history blocks we will use across all files.
static Optional<int> checksumHistoryBudget;
static int checksumHistoryPageSize;
// Update or check checksum(s) in history for any full pages covered by this operation
void updateChecksumHistory(bool write, int64_t offset, int len, uint8_t* buf) {
// Check or set each full block in the the range
int page = offset / checksumHistoryPageSize; // First page number
int slack = offset % checksumHistoryPageSize; // Bytes after most recent page boundary
uint8_t* start = buf; // Position in buffer to start checking from
// If offset is not page-aligned, move to next page and adjust start
if (slack != 0) {
++page;
start += (checksumHistoryPageSize - slack);
}
int pageEnd = (offset + len) / checksumHistoryPageSize; // Last page plus 1
// Make sure history is large enough or limit pageEnd
if (checksumHistory.size() < pageEnd) {
if (checksumHistoryBudget.get() > 0) {
// Resize history and update budget based on capacity change
auto initialCapacity = checksumHistory.capacity();
checksumHistory.resize(checksumHistory.size() +
std::min<int>(checksumHistoryBudget.get(), pageEnd - checksumHistory.size()));
checksumHistoryBudget.get() -= (checksumHistory.capacity() - initialCapacity);
}
// Limit pageEnd to end of history, which works whether or not all of the desired
// history slots were allocated.
pageEnd = checksumHistory.size();
}
while (page < pageEnd) {
uint32_t checksum = crc32c_append(0xab12fd93, start, checksumHistoryPageSize);
WriteInfo& history = checksumHistory[page];
// printf("%d %d %u %u\n", write, page, checksum, history.checksum);
#if VALGRIND
// It's possible we'll read or write a page where not all of the data is defined, but the checksum of the
// page is still valid
VALGRIND_MAKE_MEM_DEFINED_IF_ADDRESSABLE(&checksum, sizeof(uint32_t));
#endif
// For writes, just update the stored sum
double millisecondsPerSecond = 1000;
if (write) {
history.timestamp = (uint32_t)(now() * millisecondsPerSecond);
history.checksum = checksum;
} else {
if (history.checksum != 0 && history.checksum != checksum) {
// For reads, verify the stored sum if it is not 0. If it fails, clear it.
TraceEvent(SevError, "AsyncFileLostWriteDetected")
.error(checksum_failed())
.detail("Filename", m_f->getFilename())
.detail("PageNumber", page)
.detail("ChecksumOfPage", checksum)
.detail("ChecksumHistory", history.checksum)
.detail("LastWriteTime", history.timestamp / millisecondsPerSecond);
history.checksum = 0;
}
}
start += checksumHistoryPageSize;
++page;
}
}
};

View File

@ -61,7 +61,7 @@
#include "fdbrpc/Net2FileSystem.h"
#include "fdbrpc/Replication.h"
#include "fdbrpc/ReplicationUtils.h"
#include "fdbrpc/AsyncFileWriteChecker.h"
#include "fdbrpc/AsyncFileWriteChecker.actor.h"
#include "fdbrpc/genericactors.actor.h"
#include "flow/FaultInjection.h"
#include "flow/TaskQueue.h"

View File

@ -187,6 +187,10 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( NON_DURABLE_MAX_WRITE_DELAY, 2.0 ); if( randomize && BUGGIFY ) NON_DURABLE_MAX_WRITE_DELAY = 5.0;
init( MAX_PRIOR_MODIFICATION_DELAY, 1.0 ); if( randomize && BUGGIFY ) MAX_PRIOR_MODIFICATION_DELAY = 10.0;
//AsyncFileWriteChecker
init( ASYNC_FILE_WRITE_CHEKCER_LOGGING_INTERVAL, 60.0 );
init( ASYNC_FILE_WRITE_CHEKCER_CHECKING_DELAY, 5.0 );
//GenericActors
init( BUGGIFY_FLOW_LOCK_RELEASE_DELAY, 1.0 );
init( LOW_PRIORITY_DELAY_COUNT, 5 );

View File

@ -252,6 +252,10 @@ public:
double NON_DURABLE_MAX_WRITE_DELAY;
double MAX_PRIOR_MODIFICATION_DELAY;
// AsyncFileWriteChecker
double ASYNC_FILE_WRITE_CHEKCER_LOGGING_INTERVAL;
double ASYNC_FILE_WRITE_CHEKCER_CHECKING_DELAY;
// GenericActors
double BUGGIFY_FLOW_LOCK_RELEASE_DELAY;
int LOW_PRIORITY_DELAY_COUNT;

View File

@ -0,0 +1,47 @@
#include "benchmark/benchmark.h"
#include "crc32/crc32c.h"
#include "flow/Hash3.h"
#include "flow/xxhash.h"
#include "flowbench/GlobalData.h"
#include "fdbrpc/AsyncFileWriteChecker.actor.h"
#include <stdint.h>
static void lru_test(benchmark::State& state) {
int run = 10000;
std::set<uint32_t> exist;
int limit = 150000000; // 600GB file
AsyncFileWriteChecker::LRU lru("TestLRU");
// Benchmark
for (auto _ : state) {
for (int i = 0; i < run; ++i) {
double r = deterministicRandom()->random01();
// [0. 0,45] remove
// [0.45, 0.5] truncate
// [0.5, 1] update
if (exist.size() < 2 || r > 0.5) {
// to add/update
uint32_t page = deterministicRandom()->randomInt(0, limit);
// change the content each time
auto wi = AsyncFileWriteChecker::WriteInfo();
wi.timestamp = i;
lru.update(page, wi);
exist.insert(page);
} else if (r < 0.45) {
auto it = exist.begin();
std::advance(it, deterministicRandom()->randomInt(0, (int)exist.size()));
lru.remove(*it);
exist.erase(it);
} else {
// to truncate, only truncate to first half
auto it = exist.begin();
std::advance(it, deterministicRandom()->randomInt(0, (int)exist.size() / 2));
lru.truncate(*it);
exist.erase(it, exist.end());
}
}
}
}
BENCHMARK(lru_test);