Added counters for page writes, sets, and commits.

This commit is contained in:
Stephen Atherton 2018-12-05 22:41:04 -08:00
parent dbaaaac354
commit 534e50de71
2 changed files with 38 additions and 2 deletions

View File

@ -447,6 +447,29 @@ public:
static Key beginKey;
static Key endKey;
struct Counts {
Counts() {
memset(this, 0, sizeof(Counts));
}
void clear() {
*this = Counts();
}
int64_t pageWrites;
int64_t blockWrites;
int64_t sets;
int64_t clears;
int64_t commits;
std::string toString() const {
std::string s = format("sets=%lld clears=%lld commits=%lld pages=%lld blocks=%lld\n", sets, clears, commits, pageWrites, blockWrites);
return s;
}
};
Counts counts;
// All async opts on the btree are based on pager reads, writes, and commits, so
// we can mostly forward these next few functions to the pager
virtual Future<Void> getError() {
@ -484,6 +507,7 @@ public:
// A write is considered part of (a change leading to) the version determined by the previous call to setWriteVersion()
// A write shall not become durable until the following call to commit() begins, and shall be durable once the following call to commit() returns
virtual void set(KeyValueRef keyValue) {
++counts.sets;
SingleKeyMutationsByVersion &changes = insertMutationBoundary(keyValue.key)->second.startKeyMutations;
// Add the set if the changes set is empty or the last entry isn't a set to exactly the same value
@ -492,6 +516,7 @@ public:
}
}
virtual void clear(KeyRangeRef range) {
++counts.clears;
MutationBufferT::iterator iBegin = insertMutationBoundary(range.begin);
MutationBufferT::iterator iEnd = insertMutationBoundary(range.end);
@ -818,6 +843,8 @@ private:
debug_printf("%p: writePages(): Writing %lu replacement pages for %d at version %lld\n", actor_debug, pages.size(), originalID, version);
for(int i=0; i<pages.size(); i++) {
++counts.pageWrites;
// Allocate page number for main page first
LogicalPageID id = primaryLogicalPageIDs[i];
@ -834,6 +861,7 @@ private:
newPage->extensionPages[e] = eid;
// If replacing the primary page below (version == 0) then pass the primary page's ID as the reference page ID
m_pager->writePage(eid, extPages[e], version, (version == 0) ? id : invalidLogicalPageID);
++counts.blockWrites;
}
debug_printf("%p: writePages(): Writing primary page op=write id=%u @%lld (+%lu extension pages)\n", actor_debug, id, version, extPages.size());
@ -842,6 +870,7 @@ private:
else {
debug_printf("%p: writePages(): Writing normal page op=write id=%u @%lld\n", actor_debug, id, version);
writePage(id, pages[i].firstPage, version, pages[i].lowerBound, (i == pages.size() - 1) ? upperBound : pages[i + 1].lowerBound);
++counts.blockWrites;
}
}
@ -1330,6 +1359,7 @@ private:
self->m_mutationBuffers.erase(self->m_mutationBuffers.begin());
self->m_lastCommittedVersion = writeVersion;
++self->counts.commits;
committed.send(Void());
return Void();
@ -2428,15 +2458,18 @@ TEST_CASE("!/redwood/performance/set") {
++records;
}
if(g_random->random01() < (1.0 / 300)) {
if(g_random->random01() < (1.0 / 1000)) {
wait(commit);
commit = btree->commit();
wait(commit);
double elapsed = now() - startTime;
printf("Committed (cumulative) %lld bytes in %d records in %f seconds, %.2f MB/s\n", kvBytes, records, elapsed, kvBytes / elapsed / 1e6);
printf("Committed (cumulative) %lld bytes in %d records in %f seconds, %.2f MB/s %s\n", kvBytes, records, elapsed, kvBytes / elapsed / 1e6, btree->counts.toString().c_str());
}
}
wait(btree->commit());
double elapsed = now() - startTime;
printf("Committed (cumulative) %lld bytes in %d records in %f seconds, %.2f MB/s %s\n", kvBytes, records, elapsed, kvBytes / elapsed / 1e6, btree->counts.toString().c_str());
Future<Void> closedFuture = btree->onClosed();
btree->close();

View File

@ -2656,6 +2656,8 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
state Version newOldestVersion = data->storageVersion();
state Version desiredVersion = data->desiredOldestVersion.get();
state int64_t bytesLeft = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
// Write mutations to storage until we reach the desiredVersion or have written too much (bytesleft)
loop {
state bool done = data->storage.makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft);
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
@ -2667,6 +2669,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
if (done) break;
}
// Set the new durable version as part of the outstanding change set, before commit
if (startOldestVersion != newOldestVersion)
data->storage.makeVersionDurable( newOldestVersion );